Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: refactored event handling #290

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions src/margo-abt-config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*
* See COPYRIGHT in top-level directory.
*/
#include <errno.h>
#include <sys/eventfd.h>
#include "margo-abt-config.h"

static inline char* generate_unused_pool_name(const margo_abt_t* abt);
Expand Down Expand Up @@ -32,7 +34,7 @@
"mpsc", "spmc", "mpmc");
}

/* default: "fifo_wait" */
/* default: "prio_wait" */
ASSERT_CONFIG_HAS_OPTIONAL(jpool, "kind", string, "pool");
json_object_t* jkind = json_object_object_get(jpool, "kind");
if (jkind) {
Expand Down Expand Up @@ -81,7 +83,7 @@
: generate_unused_pool_name(abt);

pool->kind
= strdup(json_object_object_get_string_or(jpool, "kind", "fifo_wait"));
= strdup(json_object_object_get_string_or(jpool, "kind", "prio_wait"));
if (strcmp(pool->kind, "fifo_wait") == 0)
kind = ABT_POOL_FIFO_WAIT;
else if (strcmp(pool->kind, "fifo") == 0)
Expand All @@ -106,6 +108,7 @@
access = ABT_POOL_ACCESS_SPMC;
}

pool->efd = -1;
int ret;
if (kind != (ABT_pool_kind)(-1)) {
if (!pool->access) pool->access = strdup("mpmc");
Expand All @@ -116,13 +119,39 @@
}
} else if (strcmp(pool->kind, "prio_wait") == 0) {
if (!pool->access) pool->access = strdup("mpmc");
ABT_pool_def prio_pool_def;
ABT_pool_def prio_pool_def;
ABT_pool_config prio_pool_config;
margo_create_prio_pool_def(&prio_pool_def);
ret = ABT_pool_create(&prio_pool_def, ABT_POOL_CONFIG_NULL,
&pool->pool);

/*****************************************************/
/* TODO: prototyping.
* Also note that really this should only be done if event mode is
* activated and even then probably only for the progress pool. May have
* to rearrange code a little to make sure we can determine those
* things before initializing pool.
*/
pool->efd = eventfd(0, EFD_NONBLOCK);
if (pool->efd < 0) {
margo_error(mid, "eventfd failed with error code %d", errno);

Check warning on line 135 in src/margo-abt-config.c

View check run for this annotation

Codecov / codecov/patch

src/margo-abt-config.c#L135

Added line #L135 was not covered by tests
}
ret = ABT_pool_config_create(&prio_pool_config);
if (ret != ABT_SUCCESS) {
margo_error(mid, "ABT_pool_config_create failed with error code %d",

Check warning on line 139 in src/margo-abt-config.c

View check run for this annotation

Codecov / codecov/patch

src/margo-abt-config.c#L139

Added line #L139 was not covered by tests
ret);
}
ret = ABT_pool_config_set(prio_pool_config,
MARGO_PRIO_POOL_CONFIG_KEY_EFD,
ABT_POOL_CONFIG_INT, &pool->efd);
if (ret != ABT_SUCCESS) {
margo_error(mid, "ABT_pool_config_set failed with error code %d",

Check warning on line 146 in src/margo-abt-config.c

View check run for this annotation

Codecov / codecov/patch

src/margo-abt-config.c#L146

Added line #L146 was not covered by tests
ret);
}
/*****************************************************/
ret = ABT_pool_create(&prio_pool_def, prio_pool_config, &pool->pool);
if (ret != ABT_SUCCESS) {
margo_error(mid, "ABT_pool_create failed with error code %d", ret);
}
ABT_pool_config_free(&prio_pool_config);
} else if (strcmp(pool->kind, "earliest_first") == 0) {
if (!pool->access) pool->access = strdup("mpmc");
ABT_pool_def efirst_pool_def;
Expand Down
8 changes: 5 additions & 3 deletions src/margo-abt-config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ typedef struct margo_abt_pool {
bool margo_free_flag; /* flag if Margo is responsible for freeing */
bool used_by_primary; /* flag indicating the this pool is used by the
primary ES */
int efd; /* event fd for notification of transition from idle to busy; -1 if
not supported or not enabled */
} margo_abt_pool_t;

bool __margo_abt_pool_validate_json(const json_object_t* config,
Expand Down Expand Up @@ -109,9 +111,9 @@ void __margo_abt_sched_destroy(margo_abt_sched_t* sched);
* margo is responsible for explicitly free'ing the ES or not.
*/
typedef struct margo_abt_xstream {
char* name;
ABT_xstream xstream;
_Atomic(uint32_t) refcount; /* Number of external use this xstream */
char* name;
ABT_xstream xstream;
_Atomic(uint32_t) refcount; /* Number of external use this xstream */
struct margo_abt_sched sched;
bool margo_free_flag; /* flag if Margo is responsible for freeing */
} margo_abt_xstream_t;
Expand Down
106 changes: 17 additions & 89 deletions src/margo-prio-pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@

typedef struct unit_t {
ABT_thread thread;
ABT_task task;
struct unit_t* p_prev;
struct unit_t* p_next;
int sched_counter;
ABT_bool is_in_pool;
} unit_t;

typedef struct queue_t {
Expand All @@ -60,7 +58,6 @@
p_unit->p_next = p_head;
p_queue->p_tail = p_unit;
}
p_unit->is_in_pool = ABT_TRUE;
}

static inline unit_t* queue_pop(queue_t* p_queue)
Expand All @@ -78,9 +75,8 @@
p_unit->p_next->p_prev = p_unit->p_prev;
p_queue->p_head = p_unit->p_next;
}
p_unit->p_next = NULL;
p_unit->p_prev = NULL;
p_unit->is_in_pool = ABT_FALSE;
p_unit->p_next = NULL;
p_unit->p_prev = NULL;
return p_unit;
}
}
Expand All @@ -92,57 +88,16 @@
int cnt;
pthread_mutex_t mutex;
pthread_cond_t cond;
int efd;
} pool_t;

static ABT_unit_type pool_unit_get_type(ABT_unit unit)
{
unit_t* p_unit = (unit_t*)unit;
if (p_unit->thread != ABT_THREAD_NULL) {
return ABT_UNIT_TYPE_THREAD;
} else {
return ABT_UNIT_TYPE_TASK;
}
}

static ABT_thread pool_unit_get_thread(ABT_unit unit)
{
unit_t* p_unit = (unit_t*)unit;
return p_unit->thread;
}

static ABT_task pool_unit_get_task(ABT_unit unit)
{
unit_t* p_unit = (unit_t*)unit;
return p_unit->task;
}

static ABT_bool pool_unit_is_in_pool(ABT_unit unit)
{
unit_t* p_unit = (unit_t*)unit;
return p_unit->is_in_pool;
}

static ABT_unit pool_unit_create_from_thread(ABT_thread thread)
{
unit_t* p_unit = (unit_t*)malloc(sizeof(unit_t));
unit_t* p_unit = (unit_t*)calloc(1, sizeof(unit_t));
p_unit->thread = thread;
p_unit->task = ABT_TASK_NULL;
p_unit->p_next = NULL;
p_unit->p_prev = NULL;
p_unit->sched_counter = 0;
p_unit->is_in_pool = ABT_FALSE;
return (ABT_unit)p_unit;
}

static ABT_unit pool_unit_create_from_task(ABT_task task)
{
unit_t* p_unit = (unit_t*)malloc(sizeof(unit_t));
p_unit->thread = ABT_THREAD_NULL;
p_unit->task = task;
p_unit->p_next = NULL;
p_unit->p_prev = NULL;
p_unit->sched_counter = 0;
p_unit->is_in_pool = ABT_FALSE;
return (ABT_unit)p_unit;
}

Expand All @@ -154,16 +109,23 @@

static int pool_init(ABT_pool pool, ABT_pool_config config)
{
(void)config;
pool_t* p_pool = (pool_t*)malloc(sizeof(pool_t));
pool_t* p_pool = (pool_t*)calloc(1, sizeof(pool_t));
p_pool->high_prio_queue.p_tail = NULL;
p_pool->high_prio_queue.p_head = NULL;
p_pool->low_prio_queue.p_tail = NULL;
p_pool->low_prio_queue.p_head = NULL;
p_pool->num = 0;
p_pool->cnt = 0;
p_pool->efd = -1;
pthread_mutex_init(&p_pool->mutex, NULL);
pthread_cond_init(&p_pool->cond, NULL);
/* NOTE: we don't check return code here because efd will still be -1 if
* it fails.
*/
ABT_pool_config_get(config, MARGO_PRIO_POOL_CONFIG_KEY_EFD, NULL,
&p_pool->efd);
// fprintf(stderr, "DBG: prio_pool got efd %d\n", p_pool->efd);
Fixed Show fixed Hide fixed

Check notice

Code scanning / CodeQL

Commented-out code

This comment appears to contain commented-out code.

ABT_pool_set_data(pool, (void*)p_pool);
return ABT_SUCCESS;
}
Expand Down Expand Up @@ -210,7 +172,10 @@
queue_push(&p_pool->high_prio_queue, p_unit);
}
p_pool->num++;
pthread_cond_signal(&p_pool->cond);
/* only signal on the transition from empty to non-empty; in other cases
* there will be no one waiting on the condition.
*/
if (p_pool->num == 1) pthread_cond_signal(&p_pool->cond);
pthread_mutex_unlock(&p_pool->mutex);
}

Expand Down Expand Up @@ -289,37 +254,6 @@
return p_unit ? (ABT_unit)p_unit : ABT_UNIT_NULL;
}

static int pool_remove(ABT_pool pool, ABT_unit unit)
{
pool_t* p_pool;
ABT_pool_get_data(pool, (void**)&p_pool);
unit_t* p_unit = (unit_t*)unit;

pthread_mutex_lock(&p_pool->mutex);
if (p_unit->p_prev) {
p_unit->p_prev->p_next = p_unit->p_next;
} else {
if (p_pool->high_prio_queue.p_head == p_unit) {
p_pool->high_prio_queue.p_head = p_unit->p_next;
} else if (p_pool->low_prio_queue.p_head == p_unit) {
p_pool->low_prio_queue.p_head = p_unit->p_next;
}
}
if (p_unit->p_next) {
p_unit->p_next->p_prev = p_unit->p_prev;
} else {
if (p_pool->high_prio_queue.p_tail == p_unit) {
p_pool->high_prio_queue.p_tail = p_unit->p_prev;
} else if (p_pool->low_prio_queue.p_tail == p_unit) {
p_pool->low_prio_queue.p_tail = p_unit->p_prev;
}
}
p_pool->num--;
pthread_mutex_unlock(&p_pool->mutex);

return ABT_SUCCESS;
}

static int pool_free(ABT_pool pool)
{
pool_t* p_pool;
Expand All @@ -334,19 +268,13 @@
void margo_create_prio_pool_def(ABT_pool_def* p_def)
{
p_def->access = ABT_POOL_ACCESS_MPMC;
p_def->u_get_type = pool_unit_get_type;
p_def->u_get_thread = pool_unit_get_thread;
p_def->u_get_task = pool_unit_get_task;
p_def->u_is_in_pool = pool_unit_is_in_pool;
p_def->u_create_from_thread = pool_unit_create_from_thread;
p_def->u_create_from_task = pool_unit_create_from_task;
p_def->u_free = pool_unit_free;
p_def->p_init = pool_init;
p_def->p_get_size = pool_get_size;
p_def->p_push = pool_push;
p_def->p_pop = pool_pop;
p_def->p_pop_timedwait = pool_pop_timedwait;
p_def->p_remove = pool_remove;
p_def->p_free = pool_free;
p_def->p_print_all = NULL; /* Optional. */
}
1 change: 1 addition & 0 deletions src/margo-prio-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ extern "C" {
#include <abt.h>

void margo_create_prio_pool_def(ABT_pool_def* p_def);
#define MARGO_PRIO_POOL_CONFIG_KEY_EFD 0

#ifdef __cplusplus
}
Expand Down
4 changes: 2 additions & 2 deletions tests/unit-tests/margo-abt-pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ static MunitResult rpc_pool_kind(const MunitParameter params[], void* data)
count = count_occurrence(runtime_config, "my_pool");
munit_assert_int_goto(count, ==, 1, error);

/* just one pool with the prio_wait kind */
/* just one pool with the fifo_wait kind */
count = count_occurrence(runtime_config, pool_kind);
munit_assert_int_goto(count, ==, 1, error);

Expand Down Expand Up @@ -139,7 +139,7 @@ static MunitResult rpc_pool_kind(const MunitParameter params[], void* data)
}

static char* pool_params[] = {
"prio_wait",
"fifo_wait",
"earliest_first",
NULL
};
Expand Down
Loading
Loading