diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index c9366aa..8daeba2 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -36,15 +36,14 @@ jobs: - name: Build code and run unit tests run: | - spack env activate -d tests - eval `spack env activate --sh tests` - ./prepare.sh - ./configure --enable-coverage --prefix=`pwd` + eval `spack env activate --sh tests` && + ./prepare.sh && + ./configure --enable-coverage --prefix=`pwd` && make check - make clean - uses: codecov/codecov-action@v3 with: + token: ${{ secrets.CODECOV_TOKEN }} fail_ci_if_error: true verbose: true gcov: true diff --git a/include/margo-timer.h b/include/margo-timer.h index 5c452bb..7967994 100644 --- a/include/margo-timer.h +++ b/include/margo-timer.h @@ -26,6 +26,7 @@ typedef struct margo_timer* margo_timer_t; /** * @brief Creates a timer object. + * The callback will be submitted to the handler pool. * * @param mid Margo instance * @param cb_fn Callback to call when the timer finishes @@ -39,6 +40,33 @@ int margo_timer_create(margo_instance_id mid, void* cb_dat, margo_timer_t* timer); +/** + * @brief Creates a timer object and specifies the pool + * in which to run the callback. + * + * @note Passing ABT_POOL_NULL as the pool is allowed. + * In this case, the callback will be invoked directly + * within the ULT that runs the progress loop. This should + * generally be avoided unless the callback is very short, + * and does not call any margo_* or HG_* function. A typical + * example of a valid callback would be one that simply + * sets the value of an ABT_eventual, or one that submits + * a ULT and returns. + * + * @param mid Margo instance + * @param cb_fn Callback to call when the timer finishes + * @param cb_dat Callback data + * @param pool Pool in which to run the callback + * @param timer Resulting timer + * + * @return 0 on success, negative value on failure + */ +int margo_timer_create_with_pool(margo_instance_id mid, + margo_timer_callback_fn cb_fn, + void* cb_dat, + ABT_pool pool, + margo_timer_t* timer); + /** * @brief Start the timer with a provided timeout. * @@ -50,7 +78,15 @@ int margo_timer_create(margo_instance_id mid, int margo_timer_start(margo_timer_t timer, double timeout_ms); /** - * @brief Cancel a started timer. + * @brief Cancel a started timer. If the timer's callback + * has already been submitted as a ULT, this function will + * block until the ULT has executed. If the ULT hasn't started + * yet when margo_timer_cancel is called, the ULT won't run + * the callback and will simply return. If the ULT had started, + * it will run the callback to completion. + * + * This function guarantees that there won't be any invokation + * of the callback after margo_timer_cancel returns. * * @param timer Timer to cancel * @@ -59,8 +95,29 @@ int margo_timer_start(margo_timer_t timer, double timeout_ms); int margo_timer_cancel(margo_timer_t timer); /** - * @brief Destroys a timer. If the timer was started, - * this function will also cancel it. + * @brief Cancel n timers, blocking until it can ensure that + * no callback associated with any of the timers will be called. + * + * This function is more efficient than calling margo_timer_cancel + * in a loop because it requests the cancelation of all the timers + * before blocking. + * + * @warning All the timers must be associated with the same margo + * instance. + * + * @param n Number of timers + * @param timer Array of timers to cancel + * + * @return 0 on success, negative value on failure + */ +int margo_timer_cancel_many(size_t n, margo_timer_t* timers); + +/** + * @brief Destroys a timer. + * + * @important This function will not cancel the timer. + * If it was started, it will still fire, and the timer's + * memory will be freed afterward. * * @param timer Timer to destroy. * diff --git a/src/margo-core.c b/src/margo-core.c index f744df8..ecc9644 100644 --- a/src/margo-core.c +++ b/src/margo-core.c @@ -165,7 +165,7 @@ static void margo_cleanup(margo_instance_id mid) /* shut down pending timers */ MARGO_TRACE(mid, "Cleaning up pending timers"); - __margo_timer_list_free(mid, mid->timer_list); + __margo_timer_list_free(mid); MARGO_TRACE(mid, "Destroying mutex and condition variables"); ABT_mutex_free(&mid->finalize_mutex); @@ -883,12 +883,11 @@ static hg_return_t margo_cb(const struct hg_cb_info* info) if (hret == HG_CANCELED && req->timer) { hret = HG_TIMEOUT; } - /* remove timer if there is one and it is still in place (i.e., not timed - * out) */ - if (hret != HG_TIMEOUT && req->timer && req->handle) { - __margo_timer_destroy(mid, req->timer); + /* remove timer if there is one and it is still in place */ + if (req->timer) { + margo_timer_cancel(req->timer); + margo_timer_destroy(req->timer); } - if (req->timer) { free(req->timer); } if (req->kind == MARGO_REQ_CALLBACK) { if (req->u.callback.cb) req->u.callback.cb(req->u.callback.uargs, hret); @@ -955,7 +954,6 @@ static void margo_forward_timeout_cb(void* arg) margo_request req = (margo_request)arg; /* cancel the Mercury op if the forward timed out */ HG_Cancel(req->handle); - return; } static hg_return_t margo_provider_iforward_internal( @@ -1084,18 +1082,24 @@ static hg_return_t margo_provider_iforward_internal( if (timeout_ms > 0) { /* set a timer object to expire when this forward times out */ - req->timer = calloc(1, sizeof(*(req->timer))); - if (!(req->timer)) { + hret = margo_timer_create_with_pool(mid, margo_forward_timeout_cb, req, + ABT_POOL_NULL, &req->timer); + if (hret != HG_SUCCESS) { // LCOV_EXCL_START MARGO_EVENTUAL_FREE(&eventual); - margo_error(mid, "in %s: could not allocate memory for timer", - __func__); - hret = HG_NOMEM_ERROR; + margo_error(mid, "in %s: could not create timer", __func__); + goto finish; + // LCOV_EXCL_END + } + hret = margo_timer_start(req->timer, timeout_ms); + if (hret != HG_SUCCESS) { + // LCOV_EXCL_START + margo_timer_destroy(req->timer); + MARGO_EVENTUAL_FREE(&eventual); + margo_error(mid, "in %s: could not start timer", __func__); goto finish; // LCOV_EXCL_END } - __margo_timer_init(mid, req->timer, margo_forward_timeout_cb, req, - timeout_ms); } // get parent RPC id @@ -1120,8 +1124,8 @@ static hg_return_t margo_provider_iforward_internal( /* remove timer if HG_Forward failed */ if (hret != HG_SUCCESS && req->timer) { // LCOV_EXCL_START - __margo_timer_destroy(mid, req->timer); - free(req->timer); + margo_timer_cancel(req->timer); + margo_timer_destroy(req->timer); req->timer = NULL; // LCOV_EXCL_END } @@ -1805,7 +1809,7 @@ static void margo_thread_sleep_cb(void* arg) void margo_thread_sleep(margo_instance_id mid, double timeout_ms) { - margo_timer sleep_timer; + margo_timer_t sleep_timer; margo_thread_sleep_cb_dat sleep_cb_dat; /* monitoring */ @@ -1822,8 +1826,9 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms) sleep_cb_dat.is_asleep = 1; /* initialize the sleep timer */ - __margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb, &sleep_cb_dat, - timeout_ms); + margo_timer_create_with_pool(mid, margo_thread_sleep_cb, &sleep_cb_dat, + ABT_POOL_NULL, &sleep_timer); + margo_timer_start(sleep_timer, timeout_ms); /* yield thread for specified timeout */ ABT_mutex_lock(sleep_cb_dat.mutex); @@ -1835,6 +1840,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms) ABT_mutex_free(&sleep_cb_dat.mutex); ABT_cond_free(&sleep_cb_dat.cond); + margo_timer_destroy(sleep_timer); + /* monitoring */ __MARGO_MONITOR(mid, FN_END, sleep, monitoring_args); } diff --git a/src/margo-init.c b/src/margo-init.c index f1ecd06..4a6e30f 100644 --- a/src/margo-init.c +++ b/src/margo-init.c @@ -369,7 +369,7 @@ margo_instance_id margo_init_ext(const char* address, error: if (mid) { __margo_handle_cache_destroy(mid); - __margo_timer_list_free(mid, mid->timer_list); + __margo_timer_list_free(mid); ABT_mutex_free(&mid->finalize_mutex); ABT_cond_free(&mid->finalize_cond); ABT_mutex_free(&mid->pending_operations_mtx); diff --git a/src/margo-instance.h b/src/margo-instance.h index 8e19fc7..19828bf 100644 --- a/src/margo-instance.h +++ b/src/margo-instance.h @@ -18,6 +18,7 @@ #include #include "margo.h" +#include "margo-timer.h" #include "margo-config.h" #include "margo-abt-config.h" #include "margo-hg-config.h" @@ -129,13 +130,14 @@ struct margo_instance { #define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool -typedef enum margo_request_kind { +typedef enum margo_request_kind +{ MARGO_REQ_EVENTUAL, MARGO_REQ_CALLBACK } margo_request_kind; struct margo_request_struct { - margo_timer* timer; + margo_timer_t timer; margo_instance_id mid; hg_handle_t handle; margo_monitor_data_t monitor_data; @@ -157,10 +159,10 @@ struct margo_request_struct { struct margo_rpc_data { margo_instance_id mid; _Atomic(ABT_pool) pool; - char* rpc_name; - hg_proc_cb_t in_proc_cb; /* user-provided input proc */ - hg_proc_cb_t out_proc_cb; /* user-provided output proc */ - void* user_data; + char* rpc_name; + hg_proc_cb_t in_proc_cb; /* user-provided input proc */ + hg_proc_cb_t out_proc_cb; /* user-provided output proc */ + void* user_data; void (*user_free_callback)(void*); }; diff --git a/src/margo-timer-private.h b/src/margo-timer-private.h index 48871bf..bb3ecf0 100644 --- a/src/margo-timer-private.h +++ b/src/margo-timer-private.h @@ -13,18 +13,11 @@ extern "C" { #endif -typedef struct margo_timer { - margo_instance_id mid; - margo_timer_callback_fn cb_fn; - void* cb_dat; - double expiration; - struct margo_timer* next; - struct margo_timer* prev; -} margo_timer; +struct margo_timer_list; /** - * Creates a margo_timer_list. - * @returns a new margo_timer_list, or NULL if failed + * Creates and initializes the margo_timer_list associated with the + * margo instance. */ struct margo_timer_list* __margo_timer_list_create(); @@ -32,30 +25,7 @@ struct margo_timer_list* __margo_timer_list_create(); * Frees the timer list * @param [in] timer_lst timer list to free */ -void __margo_timer_list_free(margo_instance_id mid, - struct margo_timer_list* timer_lst); - -/** - * Initializes a margo timer object which will perform some action - * after a specified time duration - * @param [in] mid Margo instance - * @param [in] timer pointer to margo timer object to be initialized - * @param [in] cb_fn callback function for timeout action - * @param [in] cb_dat callback data passed to the callback function - * @param [in] timeout_ms timeout duration in milliseconds - */ -void __margo_timer_init(margo_instance_id mid, - margo_timer* timer, - margo_timer_callback_fn cb_fn, - void* cb_dat, - double timeout_ms); - -/** - * Destroys a margo timer object which was previously initialized - * @param [in] mid Margo instance - * @param [in] timer pointer to margo timer object to be destroyed - */ -void __margo_timer_destroy(margo_instance_id mid, margo_timer* timer); +void __margo_timer_list_free(margo_instance_id mid); /** * Checks for expired timers and performs specified timeout action @@ -73,11 +43,6 @@ void __margo_check_timers(margo_instance_id mid); int __margo_timer_get_next_expiration(margo_instance_id mid, double* next_timer_exp); -/** - * Gets the margo_timer_list from the margo instance. - */ -struct margo_timer_list* __margo_get_timer_list(margo_instance_id mid); - #ifdef __cplusplus } #endif diff --git a/src/margo-timer.c b/src/margo-timer.c index 406981c..4baa661 100644 --- a/src/margo-timer.c +++ b/src/margo-timer.c @@ -15,36 +15,104 @@ #include "margo-timer-private.h" #include "utlist.h" -/* structure for mapping margo instance ids to corresponding timer instances */ +/* Timer definition */ +typedef struct margo_timer { + margo_instance_id mid; + margo_timer_callback_fn cb_fn; + void* cb_dat; + ABT_pool pool; + double expiration; + + /* finalization mechanism, to ensure that no ULT associated with + * this timer remains to be executed. */ + ABT_mutex_memory mutex; + ABT_cond_memory cv; + size_t num_pending; + _Atomic bool canceled; + _Atomic bool destroy_requested; + + struct margo_timer_list* owner; + + struct margo_timer* next; + struct margo_timer* prev; +} margo_timer; + +/* List of timers sorted by expiration date */ struct margo_timer_list { - ABT_mutex mutex; - margo_timer* queue_head; + margo_timer* queue_head; + ABT_mutex_memory mutex; + /* finalization mechanism, to ensure that no ULT associated with timers + * remain */ + ABT_cond_memory cv; + size_t num_pending; + _Atomic bool destroy_requested; + /* Note: because ULTs associated with timers may still be pending + * when finalization is requested, we keep track of the number of + * pending ULTs and set destroy_requested to true upon finalizing. + * destroy_requested being true will prevent the submission of + * new timers. When num_pending reaches 0, __margo_timer_list_free + * unblocks and frees the list. */ }; -static void __margo_timer_queue(struct margo_timer_list* timer_lst, - margo_timer* timer); +static inline struct margo_timer_list* get_timer_list(margo_instance_id mid) +{ + return mid->timer_list; +} + +static inline void timer_list_cleanup(struct margo_timer_list* timer_lst) +{ + free(timer_lst); +} + +static inline void timer_cleanup(margo_timer_t timer) { free(timer); } + +static void timer_ult(void* args) +{ + margo_timer_t timer = (margo_timer_t)args; + struct margo_timer_list* timer_lst = timer->owner; + + if (!timer->canceled) timer->cb_fn(timer->cb_dat); + /* decrease the number of pending ULTs associated with the timer, + * check if destruction of the timer was requested, and cleanup + * if needed. */ + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + timer->num_pending -= 1; + bool no_more_pending = timer->num_pending == 0; + bool need_destroy = no_more_pending && timer->destroy_requested; + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + if (no_more_pending) + ABT_cond_signal(ABT_COND_MEMORY_GET_HANDLE(&timer->cv)); + if (need_destroy) timer_cleanup(timer); + + /* decrease the number of pending ULTs associated with any timer + * belonging to the same list, and notify the condition variable + * when reaching 0 to potentiallyunblock __margo_timer_list_free. */ + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); + timer_lst->num_pending -= 1; + no_more_pending = timer_lst->num_pending == 0; + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); + if (no_more_pending) + ABT_cond_signal(ABT_COND_MEMORY_GET_HANDLE(&timer_lst->cv)); +} struct margo_timer_list* __margo_timer_list_create() { struct margo_timer_list* timer_lst; - timer_lst = malloc(sizeof(*timer_lst)); + timer_lst = calloc(1, sizeof(*timer_lst)); if (!timer_lst) return NULL; - ABT_mutex_create(&(timer_lst->mutex)); - timer_lst->queue_head = NULL; - return timer_lst; } -void __margo_timer_list_free(margo_instance_id mid, - struct margo_timer_list* timer_lst) +void __margo_timer_list_free(margo_instance_id mid) { - margo_timer* cur; - ABT_pool handler_pool; - int ret; + struct margo_timer_list* timer_lst = get_timer_list(mid); + margo_timer* cur; + int ret; - ABT_mutex_lock(timer_lst->mutex); + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); + timer_lst->destroy_requested = true; /* delete any remaining timers from the queue */ while (timer_lst->queue_head) { cur = timer_lst->queue_head; @@ -52,64 +120,32 @@ void __margo_timer_list_free(margo_instance_id mid, cur->prev = cur->next = NULL; /* we must issue the callback now for any pending timers or else the - * callers will hang indefinitely + * callers may hang indefinitely */ - margo_get_handler_pool(mid, &handler_pool); - if (handler_pool != ABT_POOL_NULL) { - /* if handler pool is present, run callback there */ - ret = ABT_thread_create(handler_pool, cur->cb_fn, cur->cb_dat, + if (cur->pool != ABT_POOL_NULL) { + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&cur->mutex)); + cur->num_pending += 1; + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&cur->mutex)); + + timer_lst->num_pending += 1; + + ret = ABT_thread_create(cur->pool, timer_ult, cur, ABT_THREAD_ATTR_NULL, NULL); assert(ret == ABT_SUCCESS); } else { - /* else run callback in place */ cur->cb_fn(cur->cb_dat); } } - ABT_mutex_unlock(timer_lst->mutex); - ABT_mutex_free(&(timer_lst->mutex)); - - free(timer_lst); - - return; -} - -void __margo_timer_init(margo_instance_id mid, - margo_timer* timer, - margo_timer_callback_fn cb_fn, - void* cb_dat, - double timeout_ms) -{ - struct margo_timer_list* timer_lst; - - timer_lst = __margo_get_timer_list(mid); - assert(timer_lst); - assert(timer); - - memset(timer, 0, sizeof(*timer)); - timer->mid = mid; - timer->cb_fn = cb_fn; - timer->cb_dat = cb_dat; - timer->expiration = ABT_get_wtime() + (timeout_ms / 1000); - timer->prev = timer->next = NULL; - - __margo_timer_queue(timer_lst, timer); - - return; -} - -void __margo_timer_destroy(margo_instance_id mid, margo_timer* timer) -{ - struct margo_timer_list* timer_lst; - - timer_lst = __margo_get_timer_list(mid); - assert(timer_lst); - assert(timer); - - ABT_mutex_lock(timer_lst->mutex); - if (timer->prev || timer->next) DL_DELETE(timer_lst->queue_head, timer); - ABT_mutex_unlock(timer_lst->mutex); - return; + /* check if we can cleanup the list or if cleanup will be done by + * one of the submitted ULTs */ + while (timer_lst->num_pending != 0) { + ABT_cond_wait(ABT_COND_MEMORY_GET_HANDLE(&timer_lst->cv), + ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); + } + timer_list_cleanup(timer_lst); + /* Note: no need to call ABT_mutex_unlock, + * the timer_lst has been freed at this point */ } void __margo_check_timers(margo_instance_id mid) @@ -117,14 +153,12 @@ void __margo_check_timers(margo_instance_id mid) int ret; margo_timer* cur; struct margo_timer_list* timer_lst; - ABT_pool handler_pool; double now; - timer_lst = __margo_get_timer_list(mid); + timer_lst = get_timer_list(mid); assert(timer_lst); - margo_get_handler_pool(mid, &handler_pool); - ABT_mutex_lock(timer_lst->mutex); + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); if (timer_lst->queue_head) now = ABT_get_wtime(); @@ -136,12 +170,21 @@ void __margo_check_timers(margo_instance_id mid) DL_DELETE(timer_lst->queue_head, cur); cur->prev = cur->next = NULL; - /* schedule callback on the handler pool */ - ret = ABT_thread_create(handler_pool, cur->cb_fn, cur->cb_dat, - ABT_THREAD_ATTR_NULL, NULL); - assert(ret == ABT_SUCCESS); + if (cur->pool != ABT_POOL_NULL) { + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&cur->mutex)); + cur->num_pending += 1; + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&cur->mutex)); + + timer_lst->num_pending += 1; + + ret = ABT_thread_create(cur->pool, timer_ult, cur, + ABT_THREAD_ATTR_NULL, NULL); + assert(ret == ABT_SUCCESS); + } else { + cur->cb_fn(cur->cb_dat); + } } - ABT_mutex_unlock(timer_lst->mutex); + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); return; } @@ -156,10 +199,10 @@ int __margo_timer_get_next_expiration(margo_instance_id mid, double now; int ret; - timer_lst = __margo_get_timer_list(mid); + timer_lst = get_timer_list(mid); assert(timer_lst); - ABT_mutex_lock(timer_lst->mutex); + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); if (timer_lst->queue_head) { now = ABT_get_wtime(); *next_timer_exp = timer_lst->queue_head->expiration - now; @@ -167,9 +210,9 @@ int __margo_timer_get_next_expiration(margo_instance_id mid, } else { ret = -1; } - ABT_mutex_unlock(timer_lst->mutex); + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); - return (ret); + return ret; } static void __margo_timer_queue(struct margo_timer_list* timer_lst, @@ -177,7 +220,7 @@ static void __margo_timer_queue(struct margo_timer_list* timer_lst, { margo_timer* cur; - ABT_mutex_lock(timer_lst->mutex); + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); /* if list of timers is empty, put ourselves on it */ if (!(timer_lst->queue_head)) { @@ -205,36 +248,53 @@ static void __margo_timer_queue(struct margo_timer_list* timer_lst, if (timer->prev == NULL && timer->next == NULL) DL_PREPEND(timer_lst->queue_head, timer); } - ABT_mutex_unlock(timer_lst->mutex); + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); return; } -struct margo_timer_list* __margo_get_timer_list(margo_instance_id mid) -{ - return mid->timer_list; -} - int margo_timer_create(margo_instance_id mid, margo_timer_callback_fn cb_fn, void* cb_dat, margo_timer_t* timer) { + ABT_pool pool; + int ret = margo_get_handler_pool(mid, &pool); + if (ret != 0) return ret; + return margo_timer_create_with_pool(mid, cb_fn, cb_dat, pool, timer); +} + +int margo_timer_create_with_pool(margo_instance_id mid, + margo_timer_callback_fn cb_fn, + void* cb_dat, + ABT_pool pool, + margo_timer_t* timer) +{ + if (!pool) pool = ABT_POOL_NULL; + + struct margo_timer_list* timer_lst = get_timer_list(mid); + if (timer_lst->destroy_requested) return -1; + margo_timer_t tmp = (margo_timer_t)calloc(1, sizeof(*tmp)); if (!tmp) return -1; tmp->mid = mid; tmp->cb_fn = cb_fn; tmp->cb_dat = cb_dat; + tmp->pool = pool; + tmp->owner = timer_lst; *timer = tmp; return 0; } int margo_timer_start(margo_timer_t timer, double timeout_ms) { - if (timer->prev != NULL || timer->next != NULL) return -1; + struct margo_timer_list* timer_lst = get_timer_list(timer->mid); + bool already_started = timer->prev != NULL || timer->next != NULL; - struct margo_timer_list* timer_lst = __margo_get_timer_list(timer->mid); - timer->expiration = ABT_get_wtime() + (timeout_ms / 1000); + if (already_started || timer->canceled || timer_lst->destroy_requested) + return -1; + + timer->expiration = ABT_get_wtime() + (timeout_ms / 1000); __margo_timer_queue(timer_lst, timer); return 0; @@ -242,20 +302,68 @@ int margo_timer_start(margo_timer_t timer, double timeout_ms) int margo_timer_cancel(margo_timer_t timer) { - struct margo_timer_list* timer_lst = __margo_get_timer_list(timer->mid); - - ABT_mutex_lock(timer_lst->mutex); + // Mark the timer as canceled to prevent existing ULTs that have been + // submitted but haven't started from calling the callback and to prevent + // calls to margo_timer_start on this timer from succeeding. + timer->canceled = true; + struct margo_timer_list* timer_lst = get_timer_list(timer->mid); + + // Remove the timer from the list of pending timers + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); if (timer->prev || timer->next) DL_DELETE(timer_lst->queue_head, timer); - ABT_mutex_unlock(timer_lst->mutex); + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); timer->prev = timer->next = NULL; + // Wait for any remaining ULTs + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + while (timer->num_pending != 0) { + ABT_cond_wait(ABT_COND_MEMORY_GET_HANDLE(&timer->cv), + ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + } + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + + // Uncancel the timer (so we can call margo_timer_start again) + timer->canceled = false; + + return 0; +} + +int margo_timer_cancel_many(size_t n, margo_timer_t* timers) +{ + if (n == 0) return 0; + struct margo_timer_list* timer_lst = get_timer_list(timers[0]->mid); + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); + for (int i = 0; i < n; ++i) { + // Mark each timer as canceled + timers[i]->canceled = true; + // Remove each timer from the list of pending timers + if (timers[i]->prev || timers[i]->next) + DL_DELETE(timer_lst->queue_head, timers[i]); + timers[i]->prev = timers[i]->next = NULL; + } + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer_lst->mutex)); + + for (int i = 0; i < n; ++i) { + // Wait for any remaining ULTs for each timer + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timers[i]->mutex)); + while (timers[i]->num_pending != 0) { + ABT_cond_wait(ABT_COND_MEMORY_GET_HANDLE(&timers[i]->cv), + ABT_MUTEX_MEMORY_GET_HANDLE(&timers[i]->mutex)); + } + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timers[i]->mutex)); + // Uncanceled each timer + timers[i]->canceled = false; + } return 0; } int margo_timer_destroy(margo_timer_t timer) { - margo_timer_cancel(timer); - free(timer); + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + bool can_destroy = !timer->prev && !timer->next && !timer->num_pending; + timer->destroy_requested = !can_destroy; + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&timer->mutex)); + if (can_destroy) timer_cleanup(timer); return 0; } diff --git a/tests/unit-tests/margo-timer.c b/tests/unit-tests/margo-timer.c index f315aad..0862de4 100644 --- a/tests/unit-tests/margo-timer.c +++ b/tests/unit-tests/margo-timer.c @@ -4,6 +4,7 @@ * See COPYRIGHT in top-level directory. */ #include +#include #include #include #include "munit/munit.h" @@ -44,7 +45,14 @@ static void test_context_tear_down(void* fixture) static void timer_cb(void* data) { struct test_context* ctx = (struct test_context*)data; - ctx->flag = 1; + ctx->flag += 1; +} + +static void timer_with_sleep_cb(void* data) +{ + struct test_context* ctx = (struct test_context*)data; + margo_thread_sleep(ctx->mid, 1000); // sleep for 1 second + ctx->flag += 1; } static MunitResult test_margo_timer_start(const MunitParameter params[], @@ -80,8 +88,8 @@ static MunitResult test_margo_timer_start(const MunitParameter params[], return MUNIT_OK; } -static MunitResult test_margo_timer_cancel(const MunitParameter params[], - void* data) +static MunitResult test_margo_timer_cancel_before_ult_submitted(const MunitParameter params[], + void* data) { (void)params; (void)data; @@ -95,25 +103,207 @@ static MunitResult test_margo_timer_cancel(const MunitParameter params[], munit_assert_not_null(timer); ctx->flag = 0; - + // Start the timer with a 500ms timeout ret = margo_timer_start(timer, 500); munit_assert_int(ret, ==, 0); + // Sleep for 100ms (the timer won't have submitted its ULT) margo_thread_sleep(ctx->mid, 100); + // Cancel the timer ret = margo_timer_cancel(timer); munit_assert_int(ret, ==, 0); + // Wait until after the timer's deadline margo_thread_sleep(ctx->mid, 900); + // Ensure that the callback hasn't run munit_assert_int(ctx->flag, ==, 0); + // Destroy the timer + ret = margo_timer_destroy(timer); + munit_assert_int(ret, ==, 0); + + return MUNIT_OK; +} + +static MunitResult test_margo_timer_cancel_after_ult_started(const MunitParameter params[], + void* data) +{ + (void)params; + (void)data; + int ret; + margo_timer_t timer = MARGO_TIMER_NULL; + + struct test_context* ctx = (struct test_context*)data; + + ret = margo_timer_create(ctx->mid, timer_with_sleep_cb, data, &timer); + munit_assert_int(ret, ==, 0); + munit_assert_not_null(timer); + + ctx->flag = 0; + + // Start the timer with a 100ms timeout + ret = margo_timer_start(timer, 100); + munit_assert_int(ret, ==, 0); + + // Sleep 200ms, ensuring that the ULT has been submitted + margo_thread_sleep(ctx->mid, 200); + + // The ULT takes 1000ms to complete but it's already started so it won't be cancelled + ret = margo_timer_cancel(timer); + munit_assert_int(ret, ==, 0); + + // margo_timer_cancel will have waited for the ULT to complete, so no need to sleep + // and the flag should have been set to 1 + munit_assert_int(ctx->flag, ==, 1); + ret = margo_timer_destroy(timer); munit_assert_int(ret, ==, 0); return MUNIT_OK; } +static void just_sleep(void* arg) { + (void)arg; + sleep(1); +} + +static MunitResult test_margo_timer_cancel_before_ult_started(const MunitParameter params[], + void* data) +{ + (void)params; + (void)data; + int ret; + margo_timer_t timer = MARGO_TIMER_NULL; + + struct test_context* ctx = (struct test_context*)data; + + // Create a pool that will be associated with an ES only later, so + // we can submit timers to it but the timer ULTs won't be executed + // until we want them to. + ABT_pool pool; + ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, true, &pool); + munit_assert_int(ret, ==, 0); + + // As the first ULT in this pool, we push a "just_sleep" to give us some + // time to call margo_timer_cancel from the main ES before the ES that + // runs the timers ULT start executing them. + ret = ABT_thread_create(pool, just_sleep, NULL, ABT_THREAD_ATTR_NULL, NULL); + munit_assert_int(ret, ==, 0); + + // Create a timer that will submit its ULT on the above pool + ret = margo_timer_create_with_pool(ctx->mid, timer_with_sleep_cb, data, pool, &timer); + munit_assert_int(ret, ==, 0); + munit_assert_not_null(timer); + + ctx->flag = 0; + + // Start the timer with a 100ms timeout + ret = margo_timer_start(timer, 100); + munit_assert_int(ret, ==, 0); + + // Sleep 200ms, ensuring that the ULT has been submitted to the pool + margo_thread_sleep(ctx->mid, 200); + + // Create an ES to run the pool's ULTs + ABT_xstream xstream; + ret = ABT_xstream_create_basic(ABT_SCHED_BASIC, 1, &pool, ABT_SCHED_CONFIG_NULL, &xstream); + munit_assert_int(ret, ==, 0); + + // The ULT hasn't had a chance to start when we cancel it + ret = margo_timer_cancel(timer); + munit_assert_int(ret, ==, 0); + + // The callback shouldn't have run, flag should be 0 + munit_assert_int(ctx->flag, ==, 0); + + ret = margo_timer_destroy(timer); + munit_assert_int(ret, ==, 0); + + // Terminate the xstream + ABT_xstream_join(xstream); + ABT_xstream_free(&xstream); + + return MUNIT_OK; +} + +static MunitResult test_margo_timer_cancel_many(const MunitParameter params[], + void* data) +{ + (void)params; + (void)data; + int ret; + margo_timer_t timers[3] = { MARGO_TIMER_NULL, MARGO_TIMER_NULL, MARGO_TIMER_NULL}; + + struct test_context* ctx = (struct test_context*)data; + ctx->flag = 0; + + // Create a pool that will be associated with an ES only later, so + // we can submit timers to it but the timer ULTs won't be executed + // until we want them to. + ABT_pool pool; + ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, true, &pool); + munit_assert_int(ret, ==, 0); + + // As the first ULT in this pool, we push a "just_sleep" to give us some + // time to call margo_timer_cancel from the main ES before the ES that + // runs the timers ULT start executing them. + ret = ABT_thread_create(pool, just_sleep, NULL, ABT_THREAD_ATTR_NULL, NULL); + munit_assert_int(ret, ==, 0); + + // Create a timer that will submit its ULT on the above pool + ret = margo_timer_create_with_pool(ctx->mid, timer_with_sleep_cb, data, pool, &timers[0]); + munit_assert_int(ret, ==, 0); + munit_assert_not_null(timers[0]); + + // Create 2 other timers not in the above pool + ret = margo_timer_create(ctx->mid, timer_with_sleep_cb, data, &timers[1]); + munit_assert_int(ret, ==, 0); + ret = margo_timer_create(ctx->mid, timer_with_sleep_cb, data, &timers[2]); + munit_assert_int(ret, ==, 0); + + // Start the timer 0 with a 100ms timeout + ret = margo_timer_start(timers[0], 100); + munit_assert_int(ret, ==, 0); + + // Start timer 1 with a deadline long in the future + // (will be canceled before it becomes a ULT) + ret = margo_timer_start(timers[1], 500); + + // Start timer 2 with a short deadline so it's completed by the time we call cancel + ret = margo_timer_start(timers[2], 100); + + // Sleep 200ms, ensuring that the ULT for timer 0 has been submitted to the pool + // and timer 2 has executed. + margo_thread_sleep(ctx->mid, 200); + + // Create an ES to run the pool's ULTs + ABT_xstream xstream; + ret = ABT_xstream_create_basic(ABT_SCHED_BASIC, 1, &pool, ABT_SCHED_CONFIG_NULL, &xstream); + munit_assert_int(ret, ==, 0); + + // Cancel all the timers + ret = margo_timer_cancel_many(3, timers); + munit_assert_int(ret, ==, 0); + + // The callback should have run only once, flag should be 1 + munit_assert_int(ctx->flag, ==, 1); + + ret = margo_timer_destroy(timers[0]); + munit_assert_int(ret, ==, 0); + ret = margo_timer_destroy(timers[1]); + munit_assert_int(ret, ==, 0); + ret = margo_timer_destroy(timers[2]); + munit_assert_int(ret, ==, 0); + + // Terminate the xstream + ABT_xstream_join(xstream); + ABT_xstream_free(&xstream); + + return MUNIT_OK; +} + static MunitResult test_margo_timer_destroy(const MunitParameter params[], void* data) { @@ -130,17 +320,22 @@ static MunitResult test_margo_timer_destroy(const MunitParameter params[], ctx->flag = 0; + // Start timer with 500ms timeout ret = margo_timer_start(timer, 500); munit_assert_int(ret, ==, 0); + // Sleep for 100ms, the timer won't have submitted its ULT yet margo_thread_sleep(ctx->mid, 100); + // Destroy the timer. This won't cancel it. ret = margo_timer_destroy(timer); munit_assert_int(ret, ==, 0); + // Sleep long enough for the timer to actually fire. margo_thread_sleep(ctx->mid, 900); - munit_assert_int(ctx->flag, ==, 0); + // Ensure the flag is now 1 + munit_assert_int(ctx->flag, ==, 1); return MUNIT_OK; } @@ -153,8 +348,14 @@ static MunitParameterEnum test_params[] static MunitTest test_suite_tests[] = { {(char*)"/margo_timer/start", test_margo_timer_start, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, - {(char*)"/margo_timer/cancel", test_margo_timer_cancel, test_context_setup, - test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, + {(char*)"/margo_timer/cancel-before-ult-submitted", test_margo_timer_cancel_before_ult_submitted, + test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, + {(char*)"/margo_timer/cancel-before-ult-started", test_margo_timer_cancel_after_ult_started, + test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, + {(char*)"/margo_timer/cancel-after-ult-started", test_margo_timer_cancel_after_ult_started, + test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, + {(char*)"/margo_timer/cancel-many", test_margo_timer_cancel_many, + test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, {(char*)"/margo_timer/destroy", test_margo_timer_destroy, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},