Skip to content

Commit

Permalink
Add fragment test to integration testing (#302)
Browse files Browse the repository at this point in the history
* test: add fragment test to integration testing

* fix: change perf test config

* feat: add throughput examples

* fix: remove test for performance

* fix: use monotonic time for throughput example

* fix: convert type before divide operation
  • Loading branch information
jean-roland authored Dec 20, 2023
1 parent a0b421c commit a952247
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:

- name: Test debug
run: make test
timeout-minutes: 15
env:
BUILD_TYPE: Debug # Workaround for Windows as it seems the previous step is being ignored
BUILD_TESTING: OFF # Workaround for Windows as it seems the previous step is being ignored
Expand Down
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ if(UNIX)
add_example(z_scout unix/c11/z_scout.c)
add_example(z_ping unix/c11/z_ping.c)
add_example(z_pong unix/c11/z_pong.c)
add_example(z_pub_thr unix/c11/z_pub_thr.c)
add_example(z_sub_thr unix/c11/z_sub_thr.c)
endif()
elseif(MSVC)
add_example(z_put windows/z_put.c)
Expand Down
75 changes: 75 additions & 0 deletions examples/unix/c11/z_pub_thr.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "zenoh-pico.h"

#if Z_FEATURE_PUBLICATION == 1
int main(int argc, char **argv) {
if (argc < 2) {
printf("USAGE:\n\tz_pub_thr <payload-size> [<zenoh-locator>]\n\n");
exit(-1);
}
char *keyexpr = "test/thr";
size_t len = atoi(argv[1]);
uint8_t *value = (uint8_t *)malloc(len);
memset(value, 1, len);

// Set config
z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[2])) < 0) {
printf("Couldn't insert locator in config: %s\n", argv[2]);
exit(-1);
}
}
// Open session
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) {
printf("Unable to start read and lease tasks");
exit(-1);
}
// Declare publisher
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
printf("Unable to declare publisher for key expression!\n");
exit(-1);
}

// Send packets
while (1) {
z_publisher_put(z_loan(pub), (const uint8_t *)value, len, NULL);
}
// Clean up
z_undeclare_publisher(z_move(pub));
zp_stop_read_task(z_loan(s));
zp_stop_lease_task(z_loan(s));
z_close(z_move(s));
free(value);
exit(0);
}
#else
int main(void) {
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this example requires it.\n");
return -2;
}
#endif
121 changes: 121 additions & 0 deletions examples/unix/c11/z_sub_thr.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include "zenoh-pico.h"

#define PACKET_NB 1000000

typedef struct {
volatile unsigned long count;
volatile unsigned long finished_rounds;
z_clock_t start;
z_clock_t first_start;
} z_stats_t;

#if Z_FEATURE_SUBSCRIPTION == 1

z_stats_t *z_stats_make(void) {
z_stats_t *stats = malloc(sizeof(z_stats_t));
stats->count = 0;
stats->finished_rounds = 0;
stats->first_start.tv_nsec = 0;
return stats;
}

void on_sample(const z_sample_t *sample, void *context) {
(void)sample;
z_stats_t *stats = (z_stats_t *)context;
stats->count++;
// Start set measurement
if (stats->count == 1) {
stats->start = z_clock_now();
if (stats->first_start.tv_nsec == 0) {
stats->first_start = stats->start;
}
} else if (stats->count >= PACKET_NB) {
// Stop set measurement
stats->finished_rounds++;
unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start);
printf("Received %d msg in %lu ms (%.1f msg/s)\n", PACKET_NB, elapsed_ms,
(double)(PACKET_NB * 1000) / (double)elapsed_ms);
stats->count = 0;
}
}

void drop_stats(void *context) {
z_stats_t *stats = (z_stats_t *)context;
unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->first_start);
const unsigned long sent_messages = PACKET_NB * stats->finished_rounds + stats->count;
printf("Stats after unsubscribing: received %ld messages over %lu miliseconds (%.1f msg/s)\n", sent_messages,
elapsed_ms, (double)(sent_messages * 1000) / (double)elapsed_ms);
free(context);
}

int main(int argc, char **argv) {
char *keyexpr = "test/thr";
z_owned_config_t config = z_config_default();

// Set config
if (argc > 1) {
if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[1])) < 0) {
printf("Failed to insert locator in config: %s\n", argv[1]);
exit(-1);
}
}
// Open session
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) {
printf("Unable to start read and lease tasks");
exit(-1);
}
// Declare Subscriber/resource
z_stats_t *context = z_stats_make();
z_owned_closure_sample_t callback = z_closure(on_sample, drop_stats, (void *)context);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to create subscriber.\n");
exit(-1);
}
// Listen until stopped
printf("Start listening.\n");
char c = 0;
while (c != 'q') {
c = fgetc(stdin);
}
// Wait for everything to settle
printf("End of test\n");
z_sleep_s(1);
// Clean up
z_undeclare_subscriber(z_move(sub));
zp_stop_read_task(z_loan(s));
zp_stop_lease_task(z_loan(s));
z_close(z_move(s));
exit(0);
}
#else
int main(void) {
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n");
return -2;
}
#endif
35 changes: 34 additions & 1 deletion tests/z_client_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#define MSG 1000
#define MSG_LEN 1024
#define FRAGMENT_MSG_NB 100
#define FRAGMENT_MSG_LEN 100000
#define QRY 100
#define QRY_CLT 10
#define SET 100
Expand Down Expand Up @@ -92,7 +94,7 @@ void data_handler(const z_sample_t *sample, void *arg) {
printf(">> Received data: %s\t(%u/%u)\n", res, datas, total);

z_owned_str_t k_str = z_keyexpr_to_string(sample->keyexpr);
assert(sample->payload.len == MSG_LEN);
assert((sample->payload.len == MSG_LEN) || (sample->payload.len == FRAGMENT_MSG_LEN));
assert(_z_str_eq(z_loan(k_str), res) == true);

datas++;
Expand Down Expand Up @@ -233,6 +235,37 @@ int main(int argc, char **argv) {

z_sleep_s(SLEEP);

// Write fragment data from first session
if (is_reliable) {
z_free((uint8_t *)payload);
len = FRAGMENT_MSG_LEN;
payload = (uint8_t *)z_malloc(len);
memset(payload, 1, FRAGMENT_MSG_LEN);

total = FRAGMENT_MSG_NB * SET;
for (unsigned int n = 0; n < FRAGMENT_MSG_NB; n++) {
for (unsigned int i = 0; i < SET; i++) {
z_put_options_t opt = z_put_options_default();
opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
z_put(z_loan(s1), z_loan(rids1[i]), (const uint8_t *)payload, len, &opt);
printf("Wrote fragment data from session 1: %u %zu b\t(%u/%u)\n", z_loan(rids1[i])._id, len,
n * SET + (i + 1), total);
}
}
// Wait to receive all the data
now = z_clock_now();
while (datas < total) {
assert(z_clock_elapsed_s(&now) < TIMEOUT);
printf("Waiting for fragment datas... %u/%u\n", datas, total);
z_sleep_s(SLEEP);
}
if (is_reliable == true) {
assert(datas == total);
}
datas = 0;
z_sleep_s(SLEEP);
}

// Query data from first session
total = QRY * SET;
for (unsigned int n = 0; n < QRY; n++) {
Expand Down
15 changes: 12 additions & 3 deletions tests/z_perf_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,30 @@ void on_sample(const z_sample_t *sample, void *context) {

int main(int argc, char **argv) {
char *keyexpr = "test/thr";
const char *mode = "client";
const char *mode = NULL;
char *llocator = NULL;
char *clocator = NULL;
(void)argv;

// Check if peer mode
// Check if peer or client mode
if (argc > 1) {
mode = "peer";
llocator = "udp/224.0.0.224:7447#iface=lo";
} else {
mode = "client";
clocator = "tcp/127.0.0.1:7447";
}
// Set config
z_owned_config_t config = z_config_default();
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (mode != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
}
if (llocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator));
}
if (clocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator));
}
// Open session
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
Expand Down
20 changes: 13 additions & 7 deletions tests/z_perf_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ int send_packets(unsigned long pkt_len, z_owned_publisher_t *pub, uint8_t *value
z_clock_t test_start = z_clock_now();
unsigned long elapsed_us = 0;
while (elapsed_us < TEST_DURATION_US) {
if (z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL) != 0) {
printf("Put failed for pkt len: %lu\n", pkt_len);
return -1;
}
z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL);
elapsed_us = z_clock_elapsed_us(&test_start);
}
return 0;
Expand All @@ -41,21 +38,30 @@ int main(int argc, char **argv) {
uint8_t *value = (uint8_t *)malloc(len_array[0]);
memset(value, 1, len_array[0]);
char *keyexpr = "test/thr";
const char *mode = "client";
const char *mode = NULL;
char *llocator = NULL;
char *clocator = NULL;
(void)argv;

// Check if peer mode
// Check if peer or client mode
if (argc > 1) {
mode = "peer";
llocator = "udp/224.0.0.224:7447#iface=lo";
} else {
mode = "client";
clocator = "tcp/127.0.0.1:7447";
}
// Set config
z_owned_config_t config = z_config_default();
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (mode != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
}
if (llocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator));
}
if (clocator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator));
}
// Open session
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
Expand Down

0 comments on commit a952247

Please sign in to comment.