diff --git a/examples/unix/c11/z_pub_thr.c b/examples/unix/c11/z_pub_thr.c new file mode 100644 index 000000000..d1379c43c --- /dev/null +++ b/examples/unix/c11/z_pub_thr.c @@ -0,0 +1,75 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#if Z_FEATURE_PUBLICATION == 1 +int main(int argc, char **argv) { + if (argc < 2) { + printf("USAGE:\n\tz_pub_thr []\n\n"); + exit(-1); + } + char *keyexpr = "test/thr"; + size_t len = atoi(argv[1]); + uint8_t *value = (uint8_t *)malloc(len); + memset(value, 1, len); + + // Set config + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[2])) < 0) { + printf("Couldn't insert locator in config: %s\n", argv[2]); + exit(-1); + } + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare publisher + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + if (!z_check(pub)) { + printf("Unable to declare publisher for key expression!\n"); + exit(-1); + } + + // Send packets + while (1) { + z_publisher_put(z_loan(pub), (const uint8_t *)value, len, NULL); + } + // Clean up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + free(value); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this example requires it.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_sub_thr.c b/examples/unix/c11/z_sub_thr.c new file mode 100644 index 000000000..dc22659dd --- /dev/null +++ b/examples/unix/c11/z_sub_thr.c @@ -0,0 +1,121 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#define N 1000000 + +typedef struct { + volatile unsigned long count; + volatile unsigned long finished_rounds; + volatile clock_t start; + volatile clock_t stop; + volatile clock_t first_start; +} z_stats_t; + +#if Z_FEATURE_SUBSCRIPTION == 1 + +z_stats_t *z_stats_make() { + z_stats_t *stats = malloc(sizeof(z_stats_t)); + stats->count = 0; + stats->finished_rounds = 0; + stats->first_start = 0; + return stats; +} + +void on_sample(const z_sample_t *sample, void *context) { + z_stats_t *stats = (z_stats_t *)context; + if (stats->count == 0) { + stats->start = clock(); + if (!stats->first_start) { + stats->first_start = stats->start; + } + stats->count++; + } else if (stats->count < N) { + stats->count++; + } else { + stats->stop = clock(); + stats->finished_rounds++; + printf("%f msg/s\n", N * (double)CLOCKS_PER_SEC / (double)(stats->stop - stats->start)); + stats->count = 0; + } +} + +void drop_stats(void *context) { + const clock_t end = clock(); + const z_stats_t *stats = (z_stats_t *)context; + const double elapsed = (double)(end - stats->first_start) / (double)CLOCKS_PER_SEC; + const unsigned long sent_messages = N * stats->finished_rounds + stats->count; + printf("Stats being dropped after unsubscribing: sent %ld messages over %f seconds (%f msg/s)\n", sent_messages, + elapsed, (double)sent_messages / elapsed); + free(context); +} + +int main(int argc, char **argv) { + char *keyexpr = "test/thr"; + z_owned_config_t config = z_config_default(); + + // Set config + if (argc > 1) { + if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[1])) < 0) { + printf("Failed to insert locator in config: %s\n", argv[1]); + exit(-1); + } + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare Subscriber/resource + z_stats_t *context = z_stats_make(); + z_owned_closure_sample_t callback = z_closure(on_sample, drop_stats, (void *)context); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to create subscriber.\n"); + exit(-1); + } + // Listen until stopped + printf("Start listening.\n"); + char c = 0; + while (c != 'q') { + c = fgetc(stdin); + } + // Wait for everything to settle + printf("End of test\n"); + z_sleep_s(1); + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -2; +} +#endif