Skip to content

Commit

Permalink
test: queue_perf: add option for using private queues
Browse files Browse the repository at this point in the history
Add new option ('-p') for using private queues per worker. In private queue
mode each worker thread operates on separate queues which can improve
performance scaling.

Signed-off-by: Matias Elo <[email protected]>
Reviewed-by: Tuomas Taipale <[email protected]>
  • Loading branch information
MatiasElo committed Nov 1, 2024
1 parent 44d0b1c commit 3889ea3
Showing 1 changed file with 158 additions and 50 deletions.
208 changes: 158 additions & 50 deletions test/performance/odp_queue_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* @example odp_queue_perf.c
*
* Performance test application for queue APIs
* Performance test application for plain queues
*
* @cond _ODP_HIDE_FROM_DOXYGEN_
*/
Expand All @@ -30,9 +30,10 @@ typedef struct test_options_t {
uint32_t num_event;
uint32_t num_round;
uint32_t max_burst;
uint32_t num_cpu;
odp_nonblocking_t nonblock;
int single;
int num_cpu;
odp_bool_t private_queues;
odp_bool_t single;

} test_options_t;

Expand All @@ -46,6 +47,18 @@ typedef struct test_stat_t {

} test_stat_t;

typedef struct test_global_t test_global_t;

typedef struct {
test_global_t *global;
odp_barrier_t *barrier;
test_options_t *options;
test_stat_t stats;
uint32_t src_queue_id[MAX_QUEUES];
uint32_t dst_queue_id[MAX_QUEUES];
uint32_t num_queues;
} thread_args_t;

typedef struct test_global_t {
odp_barrier_t barrier;
test_options_t options;
Expand All @@ -54,7 +67,7 @@ typedef struct test_global_t {
odp_pool_t pool;
odp_queue_t queue[MAX_QUEUES];
odph_thread_t thread_tbl[ODP_THREAD_COUNT_MAX];
test_stat_t stat[ODP_THREAD_COUNT_MAX];
thread_args_t thread_args[ODP_THREAD_COUNT_MAX];
test_common_options_t common_options;

} test_global_t;
Expand All @@ -66,14 +79,15 @@ static void print_usage(void)
"\n"
"Usage: odp_queue_perf [options]\n"
"\n"
" -c, --num_cpu Number of worker threads. Default: 1\n"
" -q, --num_queue Number of queues. Default: 1\n"
" -e, --num_event Number of events per queue. Default: 1\n"
" -b, --burst_size Maximum number of events per operation. Default: 1\n"
" -c, --num_cpu Number of worker threads (default 1)\n"
" -q, --num_queue Number of queues (default 1)\n"
" -e, --num_event Number of events per queue (default 1)\n"
" -b, --burst_size Maximum number of events per operation (default 1)\n"
" -p, --private Use separate queues for each worker\n"
" -r, --num_round Number of rounds\n"
" -l, --lockfree Lockfree queues\n"
" -w, --waitfree Waitfree queues\n"
" -s, --single Single producer, single consumer\n"
" -l, --lockfree Lock-free queues\n"
" -w, --waitfree Wait-free queues\n"
" -s, --single Single producer/consumer queues\n"
" -h, --help This help\n"
"\n");
}
Expand All @@ -89,6 +103,7 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
{"num_queue", required_argument, NULL, 'q'},
{"num_event", required_argument, NULL, 'e'},
{"burst_size", required_argument, NULL, 'b'},
{"private", no_argument, NULL, 'p'},
{"num_round", required_argument, NULL, 'r'},
{"lockfree", no_argument, NULL, 'l'},
{"waitfree", no_argument, NULL, 'w'},
Expand All @@ -97,15 +112,16 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
{NULL, 0, NULL, 0}
};

static const char *shortopts = "+c:q:e:b:r:lwsh";
static const char *shortopts = "+c:q:e:b:pr:lwsh";

test_options->num_cpu = 1;
test_options->num_queue = 1;
test_options->num_event = 1;
test_options->max_burst = 1;
test_options->num_round = 1000;
test_options->nonblock = ODP_BLOCKING;
test_options->single = 0;
test_options->single = false;
test_options->private_queues = false;

while (1) {
opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
Expand Down Expand Up @@ -135,8 +151,11 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
case 'w':
test_options->nonblock = ODP_NONBLOCKING_WF;
break;
case 'p':
test_options->private_queues = true;
break;
case 's':
test_options->single = 1;
test_options->single = true;
break;
case 'h':
/* fall through */
Expand All @@ -153,11 +172,26 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
return -1;
}

if (test_options->single && test_options->num_cpu != 1) {
if (test_options->single && !test_options->private_queues && test_options->num_cpu != 1) {
ODPH_ERR("Using single prod/cons queue(s) with multiple workers not supported.\n");
return -1;
}

if (test_options->private_queues) {
int num_cpu = test_options->num_cpu;

if (num_cpu == 0)
num_cpu = odp_cpumask_default_worker(NULL, 0);

if ((int)test_options->num_queue < num_cpu) {
ODPH_ERR("Not enough queues for %d workers.\n", num_cpu);
return -1;
}
if (test_options->num_queue % num_cpu)
ODPH_ERR("Warn: %" PRIu32 " queues shared unevenly amongst %" PRIu32 " "
"workers.\n", test_options->num_queue, num_cpu);
}

return ret;
}

Expand All @@ -183,7 +217,8 @@ static int create_queues(test_global_t *global)
nonblock == ODP_BLOCKING ? "NORMAL" :
(nonblock == ODP_NONBLOCKING_LF ? "LOCKFREE" :
(nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
printf(" single prod/cons %d\n", test_options->single);
printf(" private queues %s\n", test_options->private_queues ? "yes" : "no");
printf(" single prod/cons %s\n", test_options->single ? "yes" : "no");
printf(" num rounds %u\n", num_round);
printf(" num queues %u\n", num_queue);
printf(" num events per queue %u\n", num_event);
Expand Down Expand Up @@ -374,25 +409,28 @@ static int run_test(void *arg)
odp_time_t t1, t2;
uint32_t rounds;
int num_ev;
test_stat_t *stat;
test_global_t *global = arg;
test_options_t *test_options = &global->options;
odp_queue_t queue;
thread_args_t *thr_args = arg;
test_global_t *global = thr_args->global;
test_stat_t *stat = &thr_args->stats;
odp_queue_t src_queue, dst_queue;
uint64_t num_deq_retry = 0;
uint64_t num_enq_retry = 0;
uint64_t events = 0;
uint32_t num_queue = test_options->num_queue;
uint32_t num_round = test_options->num_round;
int thr = odp_thread_id();
int ret = 0;
uint32_t i = 0;
uint32_t max_burst = test_options->max_burst;
const uint32_t num_queue = thr_args->num_queues;
const uint32_t num_round = thr_args->options->num_round;
const uint32_t max_burst = thr_args->options->max_burst;
uint32_t queue_idx = 0;
odp_event_t ev[max_burst];
odp_queue_t src_queue_tbl[MAX_QUEUES];
odp_queue_t dst_queue_tbl[MAX_QUEUES];

stat = &global->stat[thr];
for (uint32_t i = 0; i < num_queue; i++) {
src_queue_tbl[i] = global->queue[thr_args->src_queue_id[i]];
dst_queue_tbl[i] = global->queue[thr_args->dst_queue_id[i]];
}

/* Start all workers at the same time */
odp_barrier_wait(&global->barrier);
odp_barrier_wait(thr_args->barrier);

t1 = odp_time_local_strict();
c1 = odp_cpu_cycles();
Expand All @@ -401,12 +439,14 @@ static int run_test(void *arg)
int num_enq = 0;

do {
queue = global->queue[i++];
src_queue = src_queue_tbl[queue_idx];
dst_queue = dst_queue_tbl[queue_idx];

if (i == num_queue)
i = 0;
queue_idx++;
if (queue_idx == num_queue)
queue_idx = 0;

num_ev = odp_queue_deq_multi(queue, ev, max_burst);
num_ev = odp_queue_deq_multi(src_queue, ev, max_burst);

if (odp_unlikely(num_ev < 0))
ODPH_ABORT("odp_queue_deq_multi() failed\n");
Expand All @@ -417,7 +457,7 @@ static int run_test(void *arg)
} while (num_ev == 0);

while (num_enq < num_ev) {
int num = odp_queue_enq_multi(queue, &ev[num_enq], num_ev - num_enq);
int num = odp_queue_enq_multi(dst_queue, &ev[num_enq], num_ev - num_enq);

if (odp_unlikely(num < 0))
ODPH_ABORT("odp_queue_enq_multi() failed\n");
Expand All @@ -443,13 +483,76 @@ static int run_test(void *arg)
stat->deq_retry = num_deq_retry;
stat->enq_retry = num_enq_retry;

return ret;
return 0;
}

static void map_queues_to_threads(test_global_t *global)
{
test_options_t *opt = &global->options;

if (!opt->private_queues) {
for (uint32_t i = 0; i < opt->num_queue; i++) {
for (uint32_t j = 0; j < opt->num_cpu; j++) {
thread_args_t *thread_args = &global->thread_args[j];

thread_args->src_queue_id[i] = i;
thread_args->dst_queue_id[i] = i;
thread_args->num_queues++;
}
}
return;
}

for (uint32_t i = 0; i < opt->num_queue; i++) {
thread_args_t *thread_args = &global->thread_args[i % opt->num_cpu];
uint32_t queue_idx = thread_args->num_queues;

thread_args->src_queue_id[queue_idx] = i;
thread_args->dst_queue_id[queue_idx] = i;
thread_args->num_queues++;
}
}

static void print_queue_mappings(test_global_t *global)
{
printf("Worker-queue mappings\n");
printf("---------------------\n");

for (uint32_t i = 0; i < global->options.num_cpu; i++) {
thread_args_t *thread_args = &global->thread_args[i];
uint32_t num_queues = thread_args->num_queues;

printf("Worker %u:\n", i);

printf(" src queue idx:");
for (uint32_t j = 0; j < num_queues; j++)
printf(" %" PRIu32 "", thread_args->src_queue_id[j]);
printf("\n dst queue idx:");
for (uint32_t j = 0; j < num_queues; j++)
printf(" %" PRIu32 "", thread_args->dst_queue_id[j]);
printf("\n\n");
}
}

static void init_thread_args(test_global_t *global)
{
for (uint32_t i = 0; i < global->options.num_cpu; i++) {
thread_args_t *thread_args = &global->thread_args[i];

thread_args->global = global;
thread_args->barrier = &global->barrier;
thread_args->options = &global->options;
}

map_queues_to_threads(global);

print_queue_mappings(global);
}

static int start_workers(test_global_t *global)
{
odph_thread_common_param_t thr_common;
odph_thread_param_t thr_param;
odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX];
odp_cpumask_t cpumask;
int ret;
test_options_t *test_options = &global->options;
Expand All @@ -475,14 +578,17 @@ static int start_workers(test_global_t *global)
odph_thread_common_param_init(&thr_common);
thr_common.instance = global->instance;
thr_common.cpumask = &cpumask;
thr_common.share_param = 1;

odph_thread_param_init(&thr_param);
thr_param.start = run_test;
thr_param.arg = global;
thr_param.thr_type = ODP_THREAD_WORKER;
init_thread_args(global);

for (int i = 0; i < num_cpu; i++) {
odph_thread_param_init(&thr_param[i]);
thr_param[i].start = run_test;
thr_param[i].arg = &global->thread_args[i];
thr_param[i].thr_type = ODP_THREAD_WORKER;
}

if (odph_thread_create(global->thread_tbl, &thr_common, &thr_param,
if (odph_thread_create(global->thread_tbl, &thr_common, thr_param,
num_cpu) != num_cpu)
return -1;

Expand All @@ -493,6 +599,7 @@ static int output_results(test_global_t *global)
{
int i, num;
double rounds_ave, events_ave, nsec_ave, cycles_ave;
test_stat_t *stats;
test_options_t *test_options = &global->options;
int num_cpu = test_options->num_cpu;
uint64_t rounds_sum = 0;
Expand All @@ -504,12 +611,13 @@ static int output_results(test_global_t *global)

/* Averages */
for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) {
rounds_sum += global->stat[i].rounds;
events_sum += global->stat[i].events;
nsec_sum += global->stat[i].nsec;
cycles_sum += global->stat[i].cycles;
deq_retry_sum += global->stat[i].deq_retry;
enq_retry_sum += global->stat[i].enq_retry;
stats = &global->thread_args[i].stats;
rounds_sum += stats->rounds;
events_sum += stats->events;
nsec_sum += stats->nsec;
cycles_sum += stats->cycles;
deq_retry_sum += stats->deq_retry;
enq_retry_sum += stats->enq_retry;
}

if (rounds_sum == 0) {
Expand All @@ -528,12 +636,12 @@ static int output_results(test_global_t *global)
printf(" 1 2 3 4 5 6 7 8 9 10");

for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) {
if (global->stat[i].rounds) {
stats = &global->thread_args[i].stats;
if (stats->rounds) {
if ((num % 10) == 0)
printf("\n ");

printf("%6.1f ", (1000.0 * global->stat[i].events) /
global->stat[i].nsec);
printf("%6.1f ", (1000.0 * stats->events) / stats->nsec);
num++;
}
}
Expand Down

0 comments on commit 3889ea3

Please sign in to comment.