Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print warnings if configurations could lead to deadlocks or starvation scenarios #279

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/margo-init.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
static void set_argobots_environment_variables(struct json_object* config);
/* confirm if Argobots is running with desired configuration or not */
static void confirm_argobots_configuration(struct json_object* config);
// some sanity checks of the pools/xstreams setup
static bool sanity_check_abt_configuration(margo_abt_t* abt,
int progress_pool_idx);

// Shutdown logic for a margo instance
static void remote_shutdown_ult(hg_handle_t handle);
Expand Down Expand Up @@ -262,6 +265,12 @@
}
}

// sanity check configuration of pools and ES
if (!sanity_check_abt_configuration(&abt, progress_pool_idx)) {
MARGO_ERROR(0, "Configuration did not pass sanity checks");
goto error;

Check warning on line 271 in src/margo-init.c

View check run for this annotation

Codecov / codecov/patch

src/margo-init.c#L270-L271

Added lines #L270 - L271 were not covered by tests
}

// allocate margo instance
MARGO_TRACE(0, "Allocating margo instance");
mid = calloc(1, sizeof(*mid));
Expand Down Expand Up @@ -559,6 +568,104 @@
#undef HANDLE_CONFIG_ERROR
}

static bool sanity_check_abt_configuration(margo_abt_t* abt,
int progress_pool_idx)
{
// bit flags for each pool:
// 00000001 = the pool is used as first pool of at least one ES.
// 00000010 = the pool is used by an ES associated with the progress pool.
// 00000100 = the pool is used AFTER the progress pool in at least one ES.
// 00001000 = the pool is used BEFORE the progress pool in at least one ES,
// or in an ES that doesn't use the progress pool.
const uint8_t USED_AS_FIRST = 0b00000001;
const uint8_t WITH_PROGRESS = 0b00000010;
const uint8_t AFTER_PROGRESS = 0b00000100;
const uint8_t BEFORE_PROGRESS = 0b00001000;

uint8_t* pool_flags = calloc(abt->pools_len, sizeof(*pool_flags));

for (int i = 0; i < abt->xstreams_len; ++i) {
margo_abt_xstream_t* es = &abt->xstreams[i];
ABT_sched sched = ABT_SCHED_NULL;
ABT_xstream_get_main_sched(es->xstream, &sched);
int num_pools = 0;
ABT_sched_get_num_pools(sched, &num_pools);
bool this_es_has_progress_pool = false;
for (int j = 0; j < num_pools; ++j) {
ABT_pool pool = ABT_POOL_NULL;
ABT_sched_get_pools(sched, 1, j, &pool);
int pool_index = __margo_abt_find_pool_by_handle(abt, pool);
if (pool_index == progress_pool_idx)
this_es_has_progress_pool = true;
if (j == 0) pool_flags[pool_index] |= USED_AS_FIRST;
if (this_es_has_progress_pool && pool_index != progress_pool_idx)
pool_flags[pool_index] |= AFTER_PROGRESS;
if (!this_es_has_progress_pool && pool_index != progress_pool_idx)
pool_flags[pool_index] |= BEFORE_PROGRESS;
}
for (int j = 0; j < num_pools && this_es_has_progress_pool; ++j) {
ABT_pool pool = ABT_POOL_NULL;
ABT_sched_get_pools(sched, 1, j, &pool);
int pool_index = __margo_abt_find_pool_by_handle(abt, pool);
pool_flags[pool_index] |= WITH_PROGRESS;
}
}

// Note: we only issue warnings because (1) some of these situations may be
// corrected dynamically by adding new ES. We don't really know what the
// user will do next, and (2) some of these situations depends on the type
// of scheduler used, and whether it respects the order of its pools
// strictly or not.
for (int i = 0; i < abt->pools_len; ++i) {
margo_abt_pool_t* pool = &abt->pools[i];
if (pool_flags[i] == 0) {
MARGO_WARNING(
0,
"Pool \"%s\" at index %d is not currently associated with any "
"ES. ULT pushed into that pool will not get executed.",
pool->name, i);
continue;
}
if (!(pool_flags[i] & USED_AS_FIRST)) {
MARGO_WARNING(
0,
"Pool \"%s\" at index %d is not the first pool of any ES. This "
"could cause starvation for ULTs pushed in that pool.",
pool->name, i);
}
if (i == progress_pool_idx) continue;
// warnings bellow are only for non-progress pools
if (!(pool_flags[i] & BEFORE_PROGRESS)) {
MARGO_WARNING(
0,
"Pool \"%s\" at index %d does not appear before the progress "
"pool in any ES. Depending on the type of scheduler used, this "
"may cause ULTs pushed in that pool to never execute because "
"the progress pool will keep the ES busy.",
pool->name, i);
}
if (pool_flags[i] & AFTER_PROGRESS) {
MARGO_WARNING(
0,
"Pool \"%s\" at index %d appears after the progress pool in at "
"least one ES. Depending on the type of scheduler used, this "
"ES may never pull ULTs from that pool because the progress "
"pool will keep the ES busy.",
pool->name, i);
}
if (pool_flags[i] & WITH_PROGRESS) {
MARGO_WARNING(
0,
"Pool \"%s\" at index %d is used by an ES that is also "
"associated with the progress pool. This may cause ULTs pushed "
"into that pool to get unnecessarily delayed.",
pool->name, i);
}
}
free(pool_flags);
return true;
}

static void confirm_argobots_configuration(struct json_object* config)
{
/* this function assumes that the json is already fully populated */
Expand Down
10 changes: 8 additions & 2 deletions tests/unit-tests/Makefile.subdir
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ check_PROGRAMS += \
tests/unit-tests/margo-comm-error \
tests/unit-tests/margo-comm-finalize \
tests/unit-tests/margo-forward \
tests/unit-tests/margo-monitoring
tests/unit-tests/margo-monitoring \
tests/unit-tests/margo-sanity-warnings

TESTS += \
tests/unit-tests/margo-addr \
Expand All @@ -36,7 +37,8 @@ TESTS += \
tests/unit-tests/margo-comm-error \
tests/unit-tests/margo-comm-finalize \
tests/unit-tests/margo-forward \
tests/unit-tests/margo-monitoring
tests/unit-tests/margo-monitoring \
tests/unit-tests/margo-sanity-warnings

tests_unit_tests_margo_addr_SOURCES = \
tests/unit-tests/munit/munit.c \
Expand Down Expand Up @@ -120,6 +122,10 @@ tests_unit_tests_margo_monitoring_SOURCES = \
tests/unit-tests/munit/munit.c \
tests/unit-tests/margo-monitoring.c

tests_unit_tests_margo_sanity_warnings_SOURCES = \
tests/unit-tests/munit/munit.c \
tests/unit-tests/margo-sanity-warnings.c

noinst_HEADERS += tests/unit-tests/munit/munit.h \
tests/unit-tests/helper-server.h
endif
224 changes: 224 additions & 0 deletions tests/unit-tests/margo-sanity-warnings.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@

#include <margo.h>
#include "helper-server.h"
#include "munit/munit.h"

static struct margo_logger test_logger = {0};

struct test_context {
margo_instance_id mid;
char* log_buffer;
int log_buffer_pos;
int log_buffer_size;
};

static void test_log_fn(void* uargs, const char* str)
{
struct test_context* ctx = uargs;

/* check for overflow */
munit_assert_int(strlen(str) + ctx->log_buffer_pos, <,
ctx->log_buffer_size);

/* directly copy msg to buffer */
strcpy(&ctx->log_buffer[ctx->log_buffer_pos], str);
ctx->log_buffer_pos += strlen(str);

return;
}

static void* test_context_setup(const MunitParameter params[], void* user_data)
{
(void)params;
(void)user_data;
int ret;

struct test_context* ctx = calloc(1, sizeof(*ctx));
ctx->log_buffer = calloc(102400, 1);
ctx->log_buffer_size = 102400;

/* set up custom logger to make it easier to validate output */
test_logger.uargs = ctx;
test_logger.trace = test_log_fn;
test_logger.debug = test_log_fn;
test_logger.info = test_log_fn;
test_logger.warning = test_log_fn;
test_logger.error = test_log_fn;
test_logger.critical = test_log_fn;
ret = margo_set_global_logger(&test_logger);
munit_assert_int(ret, ==, 0);

return ctx;
}

static void test_context_tear_down(void* data)
{
struct test_context* ctx = (struct test_context*)data;

free(ctx->log_buffer);
free(ctx);
}

static MunitResult pool_is_not_used(const MunitParameter params[], void* data)
{
(void)params;
struct test_context* ctx = (struct test_context*)data;

const char* config = "{"
"\"argobots\":{"
"\"pools\":["
"{\"name\":\"__primary__\",\"kind\":\"fifo_wait\"},"
"{\"name\":\"p1\",\"kind\":\"fifo_wait\"}"
"],"
"\"xstreams\":["
"{\"name\":\"__primary__\","
"\"scheduler\":{\"pools\":[\"__primary__\"],\"type\":\"basic_wait\"}}"
"]"
"},"
"\"progress_pool\":\"__primary__\""
"}";

struct margo_init_info info = {0};
info.json_config = config;
margo_instance_id mid = margo_init_ext("na+sm", MARGO_SERVER_MODE, &info);
munit_assert_not_null(mid);

munit_assert_int(ctx->log_buffer_pos, !=, 0);
char* expected_content = "Pool \"p1\" at index 1 is not currently associated"
" with any ES. ULT pushed into that pool will not get executed.";
munit_assert_string_equal(ctx->log_buffer, expected_content);
margo_finalize(mid);

return MUNIT_OK;
}

static MunitResult pool_is_not_first(const MunitParameter params[], void* data)
{
(void)params;
struct test_context* ctx = (struct test_context*)data;

const char* config = "{"
"\"argobots\":{"
"\"pools\":["
"{\"name\":\"__primary__\",\"kind\":\"fifo_wait\"},"
"{\"name\":\"p1\",\"kind\":\"fifo_wait\"}"
"],"
"\"xstreams\":["
"{\"name\":\"__primary__\","
"\"scheduler\":{\"pools\":[\"__primary__\",\"p1\"],\"type\":\"basic_wait\"}}"
"]"
"},"
"\"use_progress_thread\":true"
"}";

struct margo_init_info info = {0};
info.json_config = config;
margo_instance_id mid = margo_init_ext("na+sm", MARGO_SERVER_MODE, &info);
munit_assert_not_null(mid);

munit_assert_int(ctx->log_buffer_pos, !=, 0);
char* expected_content = "Pool \"p1\" at index 1 is not the first pool of any ES. "
"This could cause starvation for ULTs pushed in that pool.";
munit_assert_string_equal(ctx->log_buffer, expected_content);
margo_finalize(mid);

return MUNIT_OK;
}

static MunitResult pool_not_before_progress(const MunitParameter params[], void* data)
{
(void)params;
struct test_context* ctx = (struct test_context*)data;

const char* config = "{"
"\"argobots\":{"
"\"pools\":["
"{\"name\":\"__primary__\",\"kind\":\"fifo_wait\"},"
"{\"name\":\"p1\",\"kind\":\"fifo_wait\"}"
"],"
"\"xstreams\":["
"{\"name\":\"__primary__\","
"\"scheduler\":{\"pools\":[\"__primary__\",\"p1\"],\"type\":\"basic_wait\"}}"
"]"
"},"
"\"progress_pool\":\"__primary__\""
"}";

struct margo_init_info info = {0};
info.json_config = config;
margo_instance_id mid = margo_init_ext("na+sm", MARGO_SERVER_MODE, &info);
munit_assert_not_null(mid);

munit_assert_int(ctx->log_buffer_pos, !=, 0);
char* expected_content =
"Pool \"p1\" at index 1 is not the first pool of any ES."
" This could cause starvation for ULTs pushed in that pool."
"Pool \"p1\" at index 1 does not appear before the progress pool in any ES."
" Depending on the type of scheduler used, this may cause ULTs pushed in that"
" pool to never execute because the progress pool will keep the ES busy."
"Pool \"p1\" at index 1 appears after the progress pool in at least one ES."
" Depending on the type of scheduler used, this ES may never pull ULTs from"
" that pool because the progress pool will keep the ES busy."
"Pool \"p1\" at index 1 is used by an ES that is also associated with the"
" progress pool. This may cause ULTs pushed into that pool to get unnecessarily delayed.";
munit_assert_string_equal(ctx->log_buffer, expected_content);
margo_finalize(mid);
return MUNIT_OK;
}

static MunitResult progress_pool_is_not_last(const MunitParameter params[], void* data)
{
(void)params;
struct test_context* ctx = (struct test_context*)data;

const char* config = "{"
"\"argobots\":{"
"\"pools\":["
"{\"name\":\"__primary__\",\"kind\":\"fifo_wait\"},"
"{\"name\":\"p1\",\"kind\":\"fifo_wait\"}"
"],"
"\"xstreams\":["
"{\"name\":\"__primary__\","
"\"scheduler\":{\"pools\":[\"__primary__\", \"p1\"],\"type\":\"basic_wait\"}},"
"{\"name\":\"es1\","
"\"scheduler\":{\"pools\":[\"p1\", \"__primary__\"],\"type\":\"basic_wait\"}},"
"]"
"},"
"\"progress_pool\":\"__primary__\""
"}";

struct margo_init_info info = {0};
info.json_config = config;
margo_instance_id mid = margo_init_ext("na+sm", MARGO_SERVER_MODE, &info);
munit_assert_not_null(mid);

munit_assert_int(ctx->log_buffer_pos, !=, 0);
char* expected_content = "Pool \"p1\" at index 1 appears after the progress pool"
" in at least one ES. Depending on the type of scheduler used, this ES may "
"never pull ULTs from that pool because the progress pool will keep the ES busy."
"Pool \"p1\" at index 1 is used by an ES that is also associated with the progress pool."
" This may cause ULTs pushed into that pool to get unnecessarily delayed.";
munit_assert_string_equal(ctx->log_buffer, expected_content);
margo_finalize(mid);

return MUNIT_OK;
}

static MunitTest tests[]
= {{"/pool_is_not_used", pool_is_not_used, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{"/pool_is_not_first", pool_is_not_first, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{"/pool_not_before_progress", pool_not_before_progress, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{"/progress_pool_is_not_last", progress_pool_is_not_last, test_context_setup,
test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL},
{NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL}};

static const MunitSuite test_suite
= {"/margo", tests, NULL, 1, MUNIT_SUITE_OPTION_NONE};

int main(int argc, char** argv)
{
return munit_suite_main(&test_suite, NULL, argc, argv);
}
Loading