diff --git a/include/margo-config.h b/include/margo-config.h index 1edd45b..b630e9b 100644 --- a/include/margo-config.h +++ b/include/margo-config.h @@ -79,6 +79,9 @@ struct margo_pool_info { * @brief Find information about a margo-managed pool by * searching for the ABT_pool handle. * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * * @param [in] mid Margo instance. * @param [in] handle ABT_pool handle. * @param [out] info Pointer to margo_pool_info struct to fill. @@ -93,8 +96,11 @@ hg_return_t margo_find_pool_by_handle(margo_instance_id mid, * @brief Find information about a margo-managed pool by * searching for the name. * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * * @param [in] mid Margo instance. - * @param [in] handle ABT_pool handle. + * @param [in] name Name of the pool. * @param [out] info Pointer to margo_pool_info struct to fill. * * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). @@ -108,7 +114,7 @@ hg_return_t margo_find_pool_by_name(margo_instance_id mid, * searching for the index. * * @param [in] mid Margo instance. - * @param [in] name Name of the pool to find. + * @param [in] index Index of the pool to find. * @param [out] info Pointer to margo_pool_info struct to fill. * * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). @@ -117,6 +123,23 @@ hg_return_t margo_find_pool_by_index(margo_instance_id mid, uint32_t index, struct margo_pool_info* info); +/** + * @brief Find information about a margo-managed pool (generic version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_pool. + * @param [out] info Pointer to margo_pool_info struct to fill. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_find_pool(mid, args, info) \ + _Generic((args), \ + ABT_pool: margo_find_pool_by_handle, \ + const char*: margo_find_pool_by_name, \ + char*: margo_find_pool_by_name, \ + default: margo_find_pool_by_index \ + )(mid, args, info) + /** * @brief Creates a new Argobots pool according to the provided * JSON description (following the same format as the pool objects @@ -185,15 +208,209 @@ hg_return_t margo_remove_pool_by_name(margo_instance_id mid, const char* name); /** * @brief Same as margo_remove_pool_by_index by using the - * name of the pool to remove. + * handle of the pool to remove. * * @param mid Margo instance. - * @param name Name of the pool to remove. + * @param handle ABT_pool handle of the pool to remove. * * @return HG_SUCCESS or other error code. */ hg_return_t margo_remove_pool_by_handle(margo_instance_id mid, ABT_pool handle); +/** + * @brief Remove a margo-managed pool (generic version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_remove_pool(mid, args) \ + _Generic((args), \ + ABT_pool: margo_remove_pool_by_handle, \ + const char*: margo_remove_pool_by_name, \ + char*: margo_remove_pool_by_name, \ + default: margo_remove_pool_by_index \ + )(mid, args) + +/** + * @brief Increment the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] handle ABT_pool handle. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_ref_incr_by_handle(margo_instance_id mid, + ABT_pool handle); + +/** + * @brief Increment the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] name Name of the pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_ref_incr_by_name(margo_instance_id mid, + const char* name); + +/** + * @brief Increment the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @param [in] mid Margo instance. + * @param [in] index Index of the pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_ref_incr_by_index(margo_instance_id mid, uint32_t index); + +/** + * @brief Increment the reference count of a margo-managed pool (generic + * version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_pool_ref_incr(mid, args) \ + _Generic((args), \ + ABT_pool: margo_pool_ref_incr_by_handle, \ + const char*: margo_pool_ref_incr_by_name, \ + char*: margo_pool_ref_incr_by_name, \ + default: margo_pool_ref_incr_by_index \ + )(mid, args) + +/** + * @brief Get the reference count of a pool managed by Margo. + * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] handle ABT_pool handle. + * @param [out] refcount Refcount. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_ref_count_by_handle(margo_instance_id mid, + ABT_pool handle, + unsigned* refcount); + +/** + * @brief Get the reference count of a pool managed by Margo. + * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] name Name of the pool. + * @param [out] refcount Refcount. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_ref_count_by_name(margo_instance_id mid, + const char* name, + unsigned* refcount); + +/** + * @brief Get the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @param [in] mid Margo instance. + * @param [in] index Index of the pool. + * @param [out] refcount Refcount. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_ref_count_by_index(margo_instance_id mid, + uint32_t index, + unsigned* refcount); + +/** + * @brief Get the reference count of a margo-managed pool (generic version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_pool. + * @param [out] refcount Refcount. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_pool_ref_count(mid, args, refcount) \ + _Generic((args), \ + ABT_pool: margo_pool_ref_count_by_handle, \ + const char*: margo_pool_ref_count_by_name, \ + char*: margo_pool_ref_count_by_name, \ + default: margo_pool_ref_count_by_index \ + )(mid, args, refcount) + +/** + * @brief Decrement the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] handle ABT_pool handle. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_release_by_handle(margo_instance_id mid, + ABT_pool handle); + +/** + * @brief Decrement the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @warning: this operation should not be called in a hot path as it searches + * for the pool in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] name Name of the pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_release_by_name(margo_instance_id mid, const char* name); + +/** + * @brief Decrement the reference count of a pool managed by Margo. + * This reference count is used to prevent removal of pools that are in use. + * + * @param [in] mid Margo instance. + * @param [in] index Index of the pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_pool_release_by_index(margo_instance_id mid, uint32_t index); + +/** + * @brief Decrement the reference count of a margo-managed pool (generic + * version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_pool. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_pool_release(mid, args) \ + _Generic((args), \ + ABT_pool: margo_pool_release_by_handle, \ + const char*: margo_pool_release_by_name, \ + char*: margo_pool_release_by_name, \ + default: margo_pool_release_by_index \ + )(mid, args) + /** * @brief Structure used to retrieve information about margo-managed xstreams. */ @@ -207,6 +424,9 @@ struct margo_xstream_info { * @brief Find information about a margo-managed xstream by * searching for the ABT_xstream handle. * + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. + * * @param [in] mid Margo instance. * @param [in] handle ABT_xstream handle. * @param [out] info Pointer to margo_xstream_info struct to fill. @@ -221,8 +441,11 @@ hg_return_t margo_find_xstream_by_handle(margo_instance_id mid, * @brief Find information about a margo-managed xstream by * searching for the name. * + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. + * * @param [in] mid Margo instance. - * @param [in] handle ABT_xstream handle. + * @param [in] name Name of the ES. * @param [out] info Pointer to margo_xstream_info struct to fill. * * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). @@ -236,7 +459,7 @@ hg_return_t margo_find_xstream_by_name(margo_instance_id mid, * searching for the index. * * @param [in] mid Margo instance. - * @param [in] name Name of the xstream to find. + * @param [in] index Index of the ES. * @param [out] info Pointer to margo_xstream_info struct to fill. * * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). @@ -245,6 +468,23 @@ hg_return_t margo_find_xstream_by_index(margo_instance_id mid, uint32_t index, struct margo_xstream_info* info); +/** + * @brief Find information about a margo-managed ES (generic version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_xstream. + * @param [out] info Pointer to margo_xstream_info struct to fill. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_find_xstream(mid, args, info) \ + _Generic((args), \ + ABT_xstream: margo_find_xstream_by_handle, \ + const char*: margo_find_xstream_by_name, \ + char*: margo_find_xstream_by_name, \ + default: margo_find_xstream_by_index \ + )(mid, args, info) + /** * @brief Creates a new Argobots xstream according to the provided * JSON description (following the same format as the xstream objects @@ -323,10 +563,10 @@ hg_return_t margo_remove_xstream_by_name(margo_instance_id mid, /** * @brief Same as margo_remove_xstream_by_index by using the - * name of the xstream to remove. + * handle of the xstream to remove. * * @param mid Margo instance. - * @param name Name of the xstream to remove. + * @param handle ABT_xstream handle of the xstream to remove. * * @return HG_SUCCESS or other error code. */ @@ -334,120 +574,217 @@ hg_return_t margo_remove_xstream_by_handle(margo_instance_id mid, ABT_xstream handle); /** - * @brief This helper function transfers the ULT from one pool to another. - * It can be used to move ULTs out of a pool that we wish to remove. + * @brief Remove a margo-managed ES (generic version). * - * Note: this function will not remove ULTs that are blocked. - * The caller can check for any remaining blocked ULTs by calling - * ABT_pool_get_total_size(origin_pool, &size). + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_xstream. * - * @param origin_pool Origin pool. - * @param target_pool Target pool. + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_remove_xstream(mid, args) \ + _Generic((args), \ + ABT_xstream: margo_remove_xstream_by_handle, \ + const char*: margo_remove_xstream_by_name, \ + char*: margo_remove_xstream_by_name, \ + default: margo_remove_xstream_by_index \ + )(mid, args) + +/** + * @brief Increment the reference count of a xstream managed by Margo. + * This reference count is used to prevent removal of xstreams that are in use. * - * @return HG_SUCCESS or other error code. + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] handle ABT_xstream handle. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -hg_return_t margo_transfer_pool_content(ABT_pool origin_pool, - ABT_pool target_pool); +hg_return_t margo_xstream_ref_incr_by_handle(margo_instance_id mid, + ABT_xstream handle); /** - * @brief Get a pool from the configuration. + * @brief Increment the reference count of a xstream managed by Margo. + * This reference count is used to prevent removal of xstreams that are in use. + * + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. * * @param [in] mid Margo instance. - * @param [in] name Name of the pool. - * @param [out] pool Argobots pool. + * @param [in] name Name of the xstream. * - * @return 0 on success, -1 on failure + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -int margo_get_pool_by_name(margo_instance_id mid, - const char* name, - ABT_pool* pool) - DEPRECATED("Use margo_find_pool_by_name instead"); +hg_return_t margo_xstream_ref_incr_by_name(margo_instance_id mid, + const char* name); /** - * @brief Get a pool from the configuration. + * @brief Increment the reference count of a xstream managed by Margo. + * This reference count is used to prevent removal of xstreams that are in use. * * @param [in] mid Margo instance. - * @param [in] index Index of the pool. - * @param [out] pool Argobots pool. + * @param [in] index Index of the xstream. * - * @return 0 on success, -1 on failure. + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -int margo_get_pool_by_index(margo_instance_id mid, - unsigned index, - ABT_pool* pool) - DEPRECATED("Use margo_find_pool_by_index instead"); +hg_return_t margo_xstream_ref_incr_by_index(margo_instance_id mid, + uint32_t index); /** - * @brief Get the name of a pool at a given index. + * @brief Increment the reference count of a margo-managed xstream (generic + * version). * - * @param mid Margo instance. - * @param index Index of the pool. + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_xstream. * - * @return The null-terminated name, or NULL if index is invalid. + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -const char* margo_get_pool_name(margo_instance_id mid, unsigned index) - DEPRECATED("Use margo_find_pool_by_index instead"); +#define margo_xstream_ref_incr(mid, args) \ + _Generic((args), \ + ABT_xstream: margo_xstream_ref_incr_by_handle, \ + const char*: margo_xstream_ref_incr_by_name, \ + char*: margo_xstream_ref_incr_by_name, \ + default: margo_xstream_ref_incr_by_index \ + )(mid, args) /** - * @brief Get the index of a pool from a given name. + * @brief Decrement the reference count of a xstream managed by Margo. + * This reference count is used to prevent removal of xstreams that are in use. * - * @param mid Margo instance. - * @param name Name of the pool. + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. * - * @return The index of the pool, or -1 if the name is invalid. + * @param [in] mid Margo instance. + * @param [in] handle ABT_xstream handle. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_xstream_release_by_handle(margo_instance_id mid, + ABT_xstream handle); + +/** + * @brief Decrement the reference count of a xstream managed by Margo. + * This reference count is used to prevent removal of xstreams that are in use. + * + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] name Name of the xstream. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_xstream_release_by_name(margo_instance_id mid, + const char* name); + +/** + * @brief Decrement the reference count of a xstream managed by Margo. + * This reference count is used to prevent removal of xstreams that are in use. + * + * @param [in] mid Margo instance. + * @param [in] index Index of the xstream. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +hg_return_t margo_xstream_release_by_index(margo_instance_id mid, + uint32_t index); + +/** + * @brief Decrement the reference count of a margo-managed xstream (generic + * version). + * + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_xstream. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). + */ +#define margo_xstream_release(mid, args) \ + _Generic((args), \ + ABT_xstream: margo_xstream_release_by_handle, \ + const char*: margo_xstream_release_by_name, \ + char*: margo_xstream_release_by_name, \ + default: margo_xstream_release_by_index \ + )(mid, args) + +/** + * @brief Get the reference count of an xstream managed by Margo. + * + * @warning: this operation should not be called in a hot path as it searches + * for the ES in a linear manner. + * + * @param [in] mid Margo instance. + * @param [in] handle ABT_xstream handle. + * @param [out] refcount Refcount. + * + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -int margo_get_pool_index(margo_instance_id mid, const char* name) - DEPRECATED("Use margo_find_pool_by_name instead"); +hg_return_t margo_xstream_ref_count_by_handle(margo_instance_id mid, + ABT_xstream handle, + unsigned* refcount); /** - * @brief Get an xstream from the configuration. + * @brief Get the reference count of an xstream managed by Margo. + * + * @warning: this operation should not be called in a hot path as it searches + * for the xstream in a linear manner. * * @param [in] mid Margo instance. * @param [in] name Name of the ES. - * @param [out] es ES. + * @param [out] refcount Refcount. * - * @return 0 on success, -1 on failure. + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -int margo_get_xstream_by_name(margo_instance_id mid, - const char* name, - ABT_xstream* es) - DEPRECATED("Use margo_find_xstream_by_name instead"); +hg_return_t margo_xstream_ref_count_by_name(margo_instance_id mid, + const char* name, + unsigned* refcount); /** - * @brief Get an xstream from the configuration. + * @brief Get the reference count of an xstream managed by Margo. + * This reference count is used to prevent removal of pools that are in use. * * @param [in] mid Margo instance. - * @param [in] name Index of the ES. - * @param [out] es ES. + * @param [in] index Index of the ES. + * @param [out] refcount Refcount. * - * @return 0 on success, -1 on failure. + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -int margo_get_xstream_by_index(margo_instance_id mid, - unsigned index, - ABT_xstream* es) - DEPRECATED("Use margo_find_xstream_by_index instead"); +hg_return_t margo_xstream_ref_count_by_index(margo_instance_id mid, + uint32_t index, + unsigned* refcount); /** - * @brief Get the name of a xstream at a given index. + * @brief Get the reference count of a margo-managed xstream (generic version). * - * @param mid Margo instance. - * @param index Index of the xstream. + * @param [in] mid Margo instance. + * @param [in] arg index, name, or ABT_xstream. + * @param [out] refcount Refcount. * - * @return The null-terminated name, or NULL if index is invalid. + * @return HG_SUCCESS or other HG error code (HG_INVALID_ARG or HG_NOENTRY). */ -const char* margo_get_xstream_name(margo_instance_id mid, unsigned index) - DEPRECATED("Use margo_find_xstream_by_index instead"); +#define margo_xstream_ref_count(mid, args, refcount) \ + _Generic((args), \ + ABT_xstream: margo_xstream_ref_count_by_handle, \ + const char*: margo_xstream_ref_count_by_name, \ + char*: margo_xstream_ref_count_by_name, \ + default: margo_xstream_ref_count_by_index \ + )(mid, args, refcount) /** - * @brief Get the index of an xstream from a given name. + * @brief This helper function transfers the ULT from one pool to another. + * It can be used to move ULTs out of a pool that we wish to remove. * - * @param mid Margo instance. - * @param name Name of the xstream. + * Note: this function will not remove ULTs that are blocked. + * The caller can check for any remaining blocked ULTs by calling + * ABT_pool_get_total_size(origin_pool, &size). * - * @return the index of the xstream, or -1 if the name is invalid. + * @param origin_pool Origin pool. + * @param target_pool Target pool. + * + * @return HG_SUCCESS or other error code. */ -int margo_get_xstream_index(margo_instance_id mid, const char* name) - DEPRECATED("Use margo_find_xstream_by_name instead"); +hg_return_t margo_transfer_pool_content(ABT_pool origin_pool, + ABT_pool target_pool); #ifdef __cplusplus } diff --git a/src/margo-abt-config.c b/src/margo-abt-config.c index 608dff3..08a40d6 100644 --- a/src/margo-abt-config.c +++ b/src/margo-abt-config.c @@ -1418,27 +1418,27 @@ bool __margo_abt_add_external_xstream(margo_abt_t* abt, return ret; } -bool __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index) +hg_return_t __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index) { if (index >= abt->pools_len) { margo_error(abt->mid, "Invalid index %u in __margo_abt_remove_pool", index); - return false; + return HG_INVALID_ARG; } margo_abt_pool_t* pool = &(abt->pools[index]); - if (pool->num_rpc_ids) { + if (pool->refcount) { margo_error(abt->mid, "Cannot remove pool %s at index %u " - "because it is used by %u RPC handlers", - pool->name, index, pool->num_rpc_ids); - return false; + "because it is used", + pool->name, index); + return HG_PERMISSION; } if (pool->num_xstreams) { margo_error(abt->mid, "Cannot remove pool %s at index %u " "because it is used by %u running xstreams", pool->name, index, pool->num_xstreams); - return false; + return HG_PERMISSION; } size_t pool_size = 0; int ret = ABT_pool_get_total_size(pool->pool, &pool_size); @@ -1448,36 +1448,37 @@ bool __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index) " (ABT_pool_get_total_size returned %d)" " in __margo_abt_pool_destroy", ret); - return false; + return HG_OTHER_ERROR; } else if (pool_size != 0) { margo_error(0, "Destroying a pool (%s) that is not empty", pool->name); - return false; + return HG_PERMISSION; } __margo_abt_pool_destroy(pool, abt); margo_abt_pool_t* last_pool = &(abt->pools[abt->pools_len - 1]); if (index != abt->pools_len - 1) memcpy(pool, last_pool, sizeof(*pool)); abt->pools_len -= 1; memset(last_pool, 0, sizeof(*last_pool)); - return true; + return HG_SUCCESS; } -bool __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index) +hg_return_t __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index) { if (index >= abt->xstreams_len) { margo_error(abt->mid, "Invalid index %u in __margo_abt_remove_xstream", index); - return false; + return HG_INVALID_ARG; } margo_abt_xstream_t* xstream = &(abt->xstreams[index]); if (strcmp(xstream->name, "__primary__") == 0) { margo_error(abt->mid, "Cannot remove primary xstream"); - return false; + return HG_PERMISSION; } + if (xstream->refcount) { return HG_PERMISSION; } __margo_abt_xstream_destroy(xstream, abt); margo_abt_xstream_t* last_xstream = &(abt->xstreams[abt->xstreams_len - 1]); if (index != abt->xstreams_len - 1) memcpy(xstream, last_xstream, sizeof(*xstream)); abt->xstreams_len -= 1; memset(last_xstream, 0, sizeof(*last_xstream)); - return true; + return HG_SUCCESS; } diff --git a/src/margo-abt-config.h b/src/margo-abt-config.h index a189a9b..4fce797 100644 --- a/src/margo-abt-config.h +++ b/src/margo-abt-config.h @@ -52,11 +52,12 @@ typedef struct margo_abt margo_abt_t; * margo is responsible for explicitly free'ing the pool or not. */ typedef struct margo_abt_pool { - char* name; - ABT_pool pool; - char* kind; - optional_char* access; /* Unknown for custom user pools */ - _Atomic(uint32_t) num_rpc_ids; /* Number of RPC ids that use this pool */ + char* name; + ABT_pool pool; + char* kind; + optional_char* access; /* Unknown for custom user pools */ + _Atomic(uint32_t) + refcount; /* Number of RPC ids and external uses of this pool */ _Atomic(uint32_t) num_xstreams; /* Number of xstreams that use this pool */ bool margo_free_flag; /* flag if Margo is responsible for freeing */ bool used_by_primary; /* flag indicating the this pool is used by the @@ -108,8 +109,9 @@ void __margo_abt_sched_destroy(margo_abt_sched_t* sched); * margo is responsible for explicitly free'ing the ES or not. */ typedef struct margo_abt_xstream { - char* name; - ABT_xstream xstream; + char* name; + ABT_xstream xstream; + _Atomic(uint32_t) refcount; /* Number of external use this xstream */ struct margo_abt_sched sched; bool margo_free_flag; /* flag if Margo is responsible for freeing */ } margo_abt_xstream_t; @@ -181,6 +183,6 @@ bool __margo_abt_add_external_pool(margo_abt_t* abt, bool __margo_abt_add_external_xstream(margo_abt_t* abt, const char* name, ABT_xstream xstream); -bool __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index); -bool __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index); +hg_return_t __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index); +hg_return_t __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index); #endif diff --git a/src/margo-config.c b/src/margo-config.c index 5f62c4b..2619509 100644 --- a/src/margo-config.c +++ b/src/margo-config.c @@ -168,6 +168,168 @@ hg_return_t margo_find_pool_by_index(margo_instance_id mid, return HG_SUCCESS; } +hg_return_t margo_pool_ref_incr_by_handle(margo_instance_id mid, + ABT_pool handle) +{ + if (mid == MARGO_INSTANCE_NULL || handle == ABT_POOL_NULL) + return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.pools_len; ++i) { + if (mid->abt.pools[i].pool == handle) { + mid->abt.pools[i].refcount++; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_pool_ref_incr_by_name(margo_instance_id mid, const char* name) +{ + if (mid == MARGO_INSTANCE_NULL || name == NULL) return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.pools_len; ++i) { + if (mid->abt.pools[i].name == NULL) continue; + if (strcmp(mid->abt.pools[i].name, name) == 0) { + mid->abt.pools[i].refcount++; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_pool_ref_incr_by_index(margo_instance_id mid, uint32_t index) +{ + if (!mid) return HG_INVALID_ARG; + __margo_abt_lock(&mid->abt); + if (index >= mid->abt.pools_len) { + __margo_abt_unlock(&mid->abt); + return HG_INVALID_ARG; + } + mid->abt.pools[index].refcount++; + __margo_abt_unlock(&mid->abt); + return HG_SUCCESS; +} + +hg_return_t margo_pool_ref_count_by_handle(margo_instance_id mid, + ABT_pool handle, + unsigned* refcount) +{ + if (mid == MARGO_INSTANCE_NULL || handle == ABT_POOL_NULL) + return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.pools_len; ++i) { + if (mid->abt.pools[i].pool == handle) { + *refcount = mid->abt.pools[i].refcount; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_pool_ref_count_by_name(margo_instance_id mid, + const char* name, + unsigned* refcount) +{ + if (mid == MARGO_INSTANCE_NULL || name == NULL) return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.pools_len; ++i) { + if (mid->abt.pools[i].name == NULL) continue; + if (strcmp(mid->abt.pools[i].name, name) == 0) { + *refcount = mid->abt.pools[i].refcount; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_pool_ref_count_by_index(margo_instance_id mid, + uint32_t index, + unsigned* refcount) +{ + if (!mid) return HG_INVALID_ARG; + __margo_abt_lock(&mid->abt); + if (index >= mid->abt.pools_len) { + __margo_abt_unlock(&mid->abt); + return HG_INVALID_ARG; + } + *refcount = mid->abt.pools[index].refcount; + __margo_abt_unlock(&mid->abt); + return HG_SUCCESS; +} + +hg_return_t margo_pool_release_by_handle(margo_instance_id mid, ABT_pool handle) +{ + if (mid == MARGO_INSTANCE_NULL || handle == ABT_POOL_NULL) + return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.pools_len; ++i) { + if (mid->abt.pools[i].pool == handle) { + if (mid->abt.pools[i].refcount == 0) { + __margo_abt_unlock(&mid->abt); + ret = HG_PERMISSION; + break; + } + mid->abt.pools[i].refcount--; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_pool_release_by_name(margo_instance_id mid, const char* name) +{ + if (mid == MARGO_INSTANCE_NULL || name == NULL) return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.pools_len; ++i) { + if (mid->abt.pools[i].name == NULL) continue; + if (strcmp(mid->abt.pools[i].name, name) == 0) { + if (mid->abt.pools[i].refcount == 0) { + __margo_abt_unlock(&mid->abt); + ret = HG_PERMISSION; + break; + } + mid->abt.pools[i].refcount--; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_pool_release_by_index(margo_instance_id mid, uint32_t index) +{ + if (!mid) return HG_INVALID_ARG; + __margo_abt_lock(&mid->abt); + if (index >= mid->abt.pools_len) { + __margo_abt_unlock(&mid->abt); + return HG_INVALID_ARG; + } + if (mid->abt.pools[index].refcount == 0) { + __margo_abt_unlock(&mid->abt); + return HG_PERMISSION; + } + mid->abt.pools[index].refcount--; + __margo_abt_unlock(&mid->abt); + return HG_SUCCESS; +} + hg_return_t margo_add_pool_from_json(margo_instance_id mid, const char* json_str, struct margo_pool_info* info) @@ -279,8 +441,7 @@ hg_return_t margo_remove_pool_by_index(margo_instance_id mid, uint32_t index) = {.info = &m_info, .ret = HG_SUCCESS}; __MARGO_MONITOR(mid, FN_START, remove_pool, monitoring_args); - bool b = __margo_abt_remove_pool(&mid->abt, index); - if (!b) ret = HG_OTHER_ERROR; + ret = __margo_abt_remove_pool(&mid->abt, index); /* monitoring */ m_info.name = NULL; @@ -329,8 +490,7 @@ hg_return_t margo_remove_pool_by_name(margo_instance_id mid, const char* name) = {.info = &m_info, .ret = HG_SUCCESS}; __MARGO_MONITOR(mid, FN_START, remove_pool, monitoring_args); - bool b = __margo_abt_remove_pool(&mid->abt, index); - if (!b) ret = HG_OTHER_ERROR; + ret = __margo_abt_remove_pool(&mid->abt, index); /* monitoring */ m_info.name = NULL; @@ -378,8 +538,7 @@ hg_return_t margo_remove_pool_by_handle(margo_instance_id mid, ABT_pool handle) = {.info = &m_info, .ret = HG_SUCCESS}; __MARGO_MONITOR(mid, FN_START, remove_pool, monitoring_args); - bool b = __margo_abt_remove_pool(&mid->abt, index); - if (!b) ret = HG_OTHER_ERROR; + ret = __margo_abt_remove_pool(&mid->abt, index); /* monitoring */ m_info.name = NULL; @@ -431,6 +590,7 @@ hg_return_t margo_find_xstream_by_name(margo_instance_id mid, info->index = i; } ret = HG_SUCCESS; + break; } } __margo_abt_unlock(&mid->abt); @@ -456,6 +616,173 @@ hg_return_t margo_find_xstream_by_index(margo_instance_id mid, return HG_SUCCESS; } +hg_return_t margo_xstream_ref_incr_by_handle(margo_instance_id mid, + ABT_xstream handle) +{ + if (mid == MARGO_INSTANCE_NULL || handle == ABT_XSTREAM_NULL) + return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.xstreams_len; ++i) { + if (mid->abt.xstreams[i].xstream == handle) { + mid->abt.xstreams[i].refcount++; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_xstream_ref_incr_by_name(margo_instance_id mid, + const char* name) +{ + if (mid == MARGO_INSTANCE_NULL || name == NULL) return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.xstreams_len; ++i) { + if (mid->abt.xstreams[i].name == NULL) continue; + if (strcmp(mid->abt.xstreams[i].name, name) == 0) { + mid->abt.xstreams[i].refcount++; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_xstream_ref_incr_by_index(margo_instance_id mid, + uint32_t index) +{ + if (!mid) return HG_INVALID_ARG; + __margo_abt_lock(&mid->abt); + if (index >= mid->abt.xstreams_len) { + __margo_abt_unlock(&mid->abt); + return HG_INVALID_ARG; + } + mid->abt.xstreams[index].refcount++; + __margo_abt_unlock(&mid->abt); + return HG_SUCCESS; +} + +hg_return_t margo_xstream_ref_count_by_handle(margo_instance_id mid, + ABT_xstream handle, + unsigned* refcount) +{ + if (mid == MARGO_INSTANCE_NULL || handle == ABT_XSTREAM_NULL) + return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.xstreams_len; ++i) { + if (mid->abt.xstreams[i].xstream == handle) { + *refcount = mid->abt.xstreams[i].refcount; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_xstream_ref_count_by_name(margo_instance_id mid, + const char* name, + unsigned* refcount) +{ + if (mid == MARGO_INSTANCE_NULL || name == NULL) return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.xstreams_len; ++i) { + if (mid->abt.xstreams[i].name == NULL) continue; + if (strcmp(mid->abt.xstreams[i].name, name) == 0) { + *refcount = mid->abt.xstreams[i].refcount; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_xstream_ref_count_by_index(margo_instance_id mid, + uint32_t index, + unsigned* refcount) +{ + if (!mid) return HG_INVALID_ARG; + __margo_abt_lock(&mid->abt); + if (index >= mid->abt.xstreams_len) { + __margo_abt_unlock(&mid->abt); + return HG_INVALID_ARG; + } + *refcount = mid->abt.xstreams[index].refcount; + __margo_abt_unlock(&mid->abt); + return HG_SUCCESS; +} + +hg_return_t margo_xstream_release_by_handle(margo_instance_id mid, + ABT_xstream handle) +{ + if (mid == MARGO_INSTANCE_NULL || handle == ABT_XSTREAM_NULL) + return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.xstreams_len; ++i) { + if (mid->abt.xstreams[i].xstream == handle) { + if (mid->abt.xstreams[i].refcount == 0) { + __margo_abt_unlock(&mid->abt); + ret = HG_PERMISSION; + break; + } + mid->abt.xstreams[i].refcount--; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_xstream_release_by_name(margo_instance_id mid, + const char* name) +{ + if (mid == MARGO_INSTANCE_NULL || name == NULL) return HG_INVALID_ARG; + hg_return_t ret = HG_NOENTRY; + __margo_abt_lock(&mid->abt); + for (uint32_t i = 0; i < mid->abt.xstreams_len; ++i) { + if (mid->abt.xstreams[i].name == NULL) continue; + if (strcmp(mid->abt.xstreams[i].name, name) == 0) { + if (mid->abt.xstreams[i].refcount == 0) { + __margo_abt_unlock(&mid->abt); + ret = HG_PERMISSION; + break; + } + mid->abt.xstreams[i].refcount--; + ret = HG_SUCCESS; + break; + } + } + __margo_abt_unlock(&mid->abt); + return ret; +} + +hg_return_t margo_xstream_release_by_index(margo_instance_id mid, + uint32_t index) +{ + if (!mid) return HG_INVALID_ARG; + __margo_abt_lock(&mid->abt); + if (index >= mid->abt.xstreams_len) { + __margo_abt_unlock(&mid->abt); + return HG_INVALID_ARG; + } + if (mid->abt.xstreams[index].refcount == 0) { + __margo_abt_unlock(&mid->abt); + return HG_PERMISSION; + } + mid->abt.xstreams[index].refcount--; + __margo_abt_unlock(&mid->abt); + return HG_SUCCESS; +} + hg_return_t margo_add_xstream_from_json(margo_instance_id mid, const char* json_str, struct margo_xstream_info* info) @@ -559,16 +886,16 @@ hg_return_t margo_remove_xstream_by_index(margo_instance_id mid, uint32_t index) = {.info = &m_info, .ret = HG_SUCCESS}; __MARGO_MONITOR(mid, FN_START, remove_xstream, monitoring_args); - bool ret = __margo_abt_remove_xstream(&mid->abt, index); + hg_return_t ret = __margo_abt_remove_xstream(&mid->abt, index); /* monitoring */ m_info.name = NULL; m_info.xstream = ABT_XSTREAM_NULL; - monitoring_args.ret = ret ? HG_SUCCESS : HG_OTHER_ERROR; + monitoring_args.ret = ret; __MARGO_MONITOR(mid, FN_END, remove_xstream, monitoring_args); __margo_abt_unlock(&mid->abt); - return ret ? HG_SUCCESS : HG_OTHER_ERROR; + return ret; } hg_return_t margo_remove_xstream_by_name(margo_instance_id mid, @@ -592,16 +919,16 @@ hg_return_t margo_remove_xstream_by_name(margo_instance_id mid, = {.info = &m_info, .ret = HG_SUCCESS}; __MARGO_MONITOR(mid, FN_START, remove_xstream, monitoring_args); - bool ret = __margo_abt_remove_xstream(&mid->abt, index); + hg_return_t ret = __margo_abt_remove_xstream(&mid->abt, index); /* monitoring */ m_info.name = NULL; m_info.xstream = ABT_XSTREAM_NULL; - monitoring_args.ret = ret ? HG_SUCCESS : HG_OTHER_ERROR; + monitoring_args.ret = ret; __MARGO_MONITOR(mid, FN_END, remove_xstream, monitoring_args); __margo_abt_unlock(&mid->abt); - return ret ? HG_SUCCESS : HG_OTHER_ERROR; + return ret; } hg_return_t margo_remove_xstream_by_handle(margo_instance_id mid, @@ -625,17 +952,17 @@ hg_return_t margo_remove_xstream_by_handle(margo_instance_id mid, = {.info = &m_info, .ret = HG_SUCCESS}; __MARGO_MONITOR(mid, FN_START, remove_xstream, monitoring_args); - bool ret = __margo_abt_remove_xstream(&mid->abt, index); + hg_return_t ret = __margo_abt_remove_xstream(&mid->abt, index); __margo_abt_unlock(&mid->abt); /* monitoring */ m_info.name = NULL; m_info.xstream = ABT_XSTREAM_NULL; - monitoring_args.ret = ret ? HG_SUCCESS : HG_OTHER_ERROR; + monitoring_args.ret = ret; __MARGO_MONITOR(mid, FN_END, remove_xstream, monitoring_args); - return ret ? HG_SUCCESS : HG_OTHER_ERROR; + return ret; } hg_return_t margo_transfer_pool_content(ABT_pool origin_pool, @@ -661,83 +988,3 @@ hg_return_t margo_transfer_pool_content(ABT_pool origin_pool, #endif return HG_SUCCESS; } - -/* DEPRECATED FUNCTIONS */ - -// LCOV_EXCL_START -int margo_get_pool_by_name(margo_instance_id mid, - const char* name, - ABT_pool* pool) -{ - struct margo_pool_info info; - hg_return_t ret = margo_find_pool_by_name(mid, name, &info); - if (ret != HG_SUCCESS) return -1; - if (pool) *pool = info.pool; - return 0; -} - -int margo_get_pool_by_index(margo_instance_id mid, - unsigned index, - ABT_pool* pool) -{ - struct margo_pool_info info; - hg_return_t ret = margo_find_pool_by_index(mid, index, &info); - if (ret != HG_SUCCESS) return -1; - if (pool) *pool = info.pool; - return 0; -} - -const char* margo_get_pool_name(margo_instance_id mid, unsigned index) -{ - struct margo_pool_info info; - hg_return_t ret = margo_find_pool_by_index(mid, index, &info); - if (ret != HG_SUCCESS) return NULL; - return info.name; -} - -int margo_get_pool_index(margo_instance_id mid, const char* name) -{ - struct margo_pool_info info; - hg_return_t ret = margo_find_pool_by_name(mid, name, &info); - if (ret != HG_SUCCESS) return -1; - return info.index; -} - -int margo_get_xstream_by_name(margo_instance_id mid, - const char* name, - ABT_xstream* es) -{ - struct margo_xstream_info info; - hg_return_t ret = margo_find_xstream_by_name(mid, name, &info); - if (ret != HG_SUCCESS) return -1; - if (es) *es = info.xstream; - return 0; -} - -int margo_get_xstream_by_index(margo_instance_id mid, - unsigned index, - ABT_xstream* es) -{ - struct margo_xstream_info info; - hg_return_t ret = margo_find_xstream_by_index(mid, index, &info); - if (ret != HG_SUCCESS) return -1; - if (es) *es = info.xstream; - return 0; -} - -const char* margo_get_xstream_name(margo_instance_id mid, unsigned index) -{ - struct margo_xstream_info info; - hg_return_t ret = margo_find_xstream_by_index(mid, index, &info); - if (ret != HG_SUCCESS) return NULL; - return info.name; -} - -int margo_get_xstream_index(margo_instance_id mid, const char* name) -{ - struct margo_xstream_info info; - hg_return_t ret = margo_find_xstream_by_name(mid, name, &info); - if (ret != HG_SUCCESS) return -1; - return info.index; -} -// LCOV_EXCL_END diff --git a/src/margo-core.c b/src/margo-core.c index 02c6fef..2e58be7 100644 --- a/src/margo-core.c +++ b/src/margo-core.c @@ -142,26 +142,23 @@ static void margo_cleanup(margo_instance_id mid) MARGO_TRACE(mid, "Destroying handle cache"); __margo_handle_cache_destroy(mid); - /* finalize Mercury before anything else because this - * could trigger some margo_cb for forward operations that - * have not completed yet (cancelling them) */ - __margo_hg_destroy(&(mid->hg)); - if (mid->abt_profiling_enabled) { MARGO_TRACE(mid, "Dumping ABT profile"); margo_dump_abt_profiling(mid, "margo-profile", 1, NULL); } - /* monitoring */ - /* Note: Monitoring called before we continue to - * finalize because we need the margo instance to still - * be valid at this point. - */ - __MARGO_MONITOR(mid, FN_END, finalize, monitoring_args); + /* finalize Mercury before anything else because this + * could trigger some margo_cb for forward operations that + * have not completed yet (cancelling them) */ + MARGO_TRACE(mid, "Destroying Mercury environment"); + __margo_hg_destroy(&(mid->hg)); - if (mid->monitor && mid->monitor->finalize) - mid->monitor->finalize(mid->monitor->uargs); - free(mid->monitor); + MARGO_TRACE(mid, "Cleaning up RPC data"); + while (mid->registered_rpcs) { + next_rpc = mid->registered_rpcs->next; + free(mid->registered_rpcs); + mid->registered_rpcs = next_rpc; + } /* shut down pending timers */ MARGO_TRACE(mid, "Cleaning up pending timers"); @@ -173,16 +170,16 @@ static void margo_cleanup(margo_instance_id mid) ABT_mutex_free(&mid->pending_operations_mtx); ABT_key_free(&(mid->current_rpc_id_key)); - MARGO_TRACE(mid, "Cleaning up RPC data"); - while (mid->registered_rpcs) { - next_rpc = mid->registered_rpcs->next; - free(mid->registered_rpcs); - mid->registered_rpcs = next_rpc; - } + /* monitoring (destroyed before Argobots since it contains mutexes) */ + __MARGO_MONITOR(mid, FN_END, finalize, monitoring_args); + MARGO_TRACE(mid, "Destroying monitoring context"); + if (mid->monitor && mid->monitor->finalize) + mid->monitor->finalize(mid->monitor->uargs); + free(mid->monitor); + MARGO_TRACE(mid, "Destroying Argobots environment"); __margo_abt_destroy(&(mid->abt)); - - if (mid->refcount == 0) free(mid); + free(mid); MARGO_TRACE(0, "Completed margo_cleanup"); } @@ -205,12 +202,14 @@ hg_return_t margo_instance_release(margo_instance_id mid) { if (!mid) return HG_INVALID_ARG; if (!mid->refcount) return HG_OTHER_ERROR; - unsigned refcount = mid->refcount--; - if (refcount == 1) { + unsigned refcount = --mid->refcount; + if (refcount == 0) { if (!mid->finalize_flag) { + ++mid->refcount; // needed because margo_finalize will itself + // decrease it back to 0 margo_finalize(mid); } else { - free(mid); + margo_cleanup(mid); } } return HG_SUCCESS; @@ -267,10 +266,11 @@ void margo_finalize(margo_instance_id mid) MARGO_TRACE(mid, "Waiting for progress thread to complete"); ABT_thread_join(mid->hg_progress_tid); ABT_thread_free(&mid->hg_progress_tid); + mid->refcount--; ABT_mutex_lock(mid->finalize_mutex); mid->finalize_flag = true; - do_cleanup = mid->finalize_refcount == 0; + do_cleanup = mid->finalize_refcount == 0 && mid->refcount == 0; ABT_mutex_unlock(mid->finalize_mutex); ABT_cond_broadcast(mid->finalize_cond); @@ -303,7 +303,7 @@ void margo_finalize_and_wait(margo_instance_id mid) ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex); mid->finalize_refcount--; - do_cleanup = mid->finalize_refcount == 0; + do_cleanup = mid->finalize_refcount == 0 && mid->refcount == 0; ABT_mutex_unlock(mid->finalize_mutex); @@ -326,7 +326,7 @@ void margo_wait_for_finalize(margo_instance_id mid) ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex); mid->finalize_refcount--; - do_cleanup = mid->finalize_refcount == 0; + do_cleanup = mid->finalize_refcount == 0 && mid->refcount == 0; ABT_mutex_unlock(mid->finalize_mutex); @@ -587,7 +587,7 @@ hg_return_t margo_deregister(margo_instance_id mid, hg_id_t rpc_id) /* decrement the numner of RPC id used by the pool */ __margo_abt_lock(&mid->abt); int32_t index = __margo_abt_find_pool_by_handle(&mid->abt, data->pool); - if (index >= 0) mid->abt.pools[index].num_rpc_ids -= 1; + if (index >= 0) mid->abt.pools[index].refcount--; __margo_abt_unlock(&mid->abt); } @@ -2161,7 +2161,7 @@ static hg_id_t margo_register_internal(margo_instance_id mid, /* increment the number of RPC ids using the pool */ struct margo_pool_info pool_info; if (margo_find_pool_by_handle(mid, pool, &pool_info) == HG_SUCCESS) { - mid->abt.pools[pool_info.index].num_rpc_ids += 1; + mid->abt.pools[pool_info.index].refcount++; } finish: @@ -2337,10 +2337,9 @@ hg_return_t margo_rpc_set_pool(margo_instance_id mid, hg_id_t id, ABT_pool pool) int old_pool_entry_idx = __margo_abt_find_pool_by_handle(&mid->abt, data->pool); int new_pool_entry_idx = __margo_abt_find_pool_by_handle(&mid->abt, pool); - if (old_pool_entry_idx >= 0) - mid->abt.pools[old_pool_entry_idx].num_rpc_ids -= 1; + if (old_pool_entry_idx >= 0) mid->abt.pools[old_pool_entry_idx].refcount--; if (new_pool_entry_idx >= 0) - mid->abt.pools[new_pool_entry_idx].num_rpc_ids += 1; + mid->abt.pools[new_pool_entry_idx].refcount++; else margo_warning(mid, "Associating RPC with a pool not know to Margo"); __margo_abt_unlock(&mid->abt); diff --git a/src/margo-hg-config.c b/src/margo-hg-config.c index 182005f..23dddcd 100644 --- a/src/margo-hg-config.c +++ b/src/margo-hg-config.c @@ -427,10 +427,14 @@ struct json_object* __margo_hg_to_json(const margo_hg_t* hg) void __margo_hg_destroy(margo_hg_t* hg) { free((char*)hg->hg_init_info.sm_info_string); + hg->hg_init_info.sm_info_string = NULL; free((char*)hg->hg_init_info.na_init_info.auth_key); + hg->hg_init_info.na_init_info.auth_key = NULL; free((char*)hg->hg_init_info.na_init_info.ip_subnet); + hg->hg_init_info.na_init_info.ip_subnet = NULL; free(hg->self_addr_str); + hg->self_addr_str = NULL; if (hg->hg_class && hg->self_addr != HG_ADDR_NULL) HG_Addr_free(hg->hg_class, hg->self_addr); @@ -444,5 +448,6 @@ void __margo_hg_destroy(margo_hg_t* hg) HG_Finalize(hg->hg_class); hg->hg_class = NULL; } + memset(hg, 0, sizeof(*hg)); } diff --git a/src/margo-init.c b/src/margo-init.c index 379c4b3..10ab643 100644 --- a/src/margo-init.c +++ b/src/margo-init.c @@ -371,6 +371,8 @@ margo_instance_id margo_init_ext(const char* address, mid, ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid); if (ret != ABT_SUCCESS) goto error; + mid->refcount = 1; + finish: json_object_put(config); return mid; diff --git a/tests/unit-tests/margo-elasticity.c b/tests/unit-tests/margo-elasticity.c index 797b6df..2c7678a 100644 --- a/tests/unit-tests/margo-elasticity.c +++ b/tests/unit-tests/margo-elasticity.c @@ -35,7 +35,7 @@ static MunitResult add_pool_from_json(const MunitParameter params[], void* data) // search for it by index struct margo_pool_info pool_info2 = {0}; - ret = margo_find_pool_by_index(mid, pool_info.index, &pool_info2); + ret = margo_find_pool(mid, pool_info.index, &pool_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(pool_info2.index, ==, pool_info.index); munit_assert_string_equal(pool_info2.name, pool_info.name); @@ -43,7 +43,7 @@ static MunitResult add_pool_from_json(const MunitParameter params[], void* data) // search for it by name memset(&pool_info2, 0, sizeof(pool_info2)); - ret = margo_find_pool_by_name(mid, pool_info.name, &pool_info2); + ret = margo_find_pool(mid, pool_info.name, &pool_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(pool_info2.index, ==, pool_info.index); munit_assert_string_equal(pool_info2.name, pool_info.name); @@ -51,7 +51,7 @@ static MunitResult add_pool_from_json(const MunitParameter params[], void* data) // search for it by handle memset(&pool_info2, 0, sizeof(pool_info2)); - ret = margo_find_pool_by_handle(mid, pool_info.pool, &pool_info2); + ret = margo_find_pool(mid, pool_info.pool, &pool_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(pool_info2.index, ==, pool_info.index); munit_assert_string_equal(pool_info2.name, pool_info.name); @@ -104,7 +104,7 @@ static MunitResult add_pool_external(const MunitParameter params[], void* data) // search for it by index struct margo_pool_info pool_info2 = {0}; - ret = margo_find_pool_by_index(mid, pool_info.index, &pool_info2); + ret = margo_find_pool(mid, pool_info.index, &pool_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(pool_info2.index, ==, pool_info.index); munit_assert_string_equal(pool_info2.name, pool_info.name); @@ -112,7 +112,7 @@ static MunitResult add_pool_external(const MunitParameter params[], void* data) // search for it by name memset(&pool_info2, 0, sizeof(pool_info2)); - ret = margo_find_pool_by_name(mid, pool_info.name, &pool_info2); + ret = margo_find_pool(mid, pool_info.name, &pool_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(pool_info2.index, ==, pool_info.index); munit_assert_string_equal(pool_info2.name, pool_info.name); @@ -120,7 +120,7 @@ static MunitResult add_pool_external(const MunitParameter params[], void* data) // search for it by handle memset(&pool_info2, 0, sizeof(pool_info2)); - ret = margo_find_pool_by_handle(mid, pool_info.pool, &pool_info2); + ret = margo_find_pool(mid, pool_info.pool, &pool_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(pool_info2.index, ==, pool_info.index); munit_assert_string_equal(pool_info2.name, pool_info.name); @@ -179,7 +179,7 @@ static MunitResult remove_pool(const MunitParameter params[], void* data) struct margo_pool_info pool_info = {0}; // add a few pools from a JSON string - for(unsigned i = 0; i < 3; i++) { + for(unsigned i = 0; i < 4; i++) { const char* pool_desc_fmt = "{\"name\":\"my_pool_%u\", \"kind\":\"fifo_wait\", \"access\": \"mpmc\"}"; char pool_desc[1024]; sprintf(pool_desc, pool_desc_fmt, i); @@ -189,63 +189,81 @@ static MunitResult remove_pool(const MunitParameter params[], void* data) } // Get my_pool_0 and register an RPC handler with it - ret = margo_find_pool_by_name(mid, "my_pool_0", &pool_info); + ret = margo_find_pool(mid, "my_pool_0", &pool_info); munit_assert_int(ret, ==, HG_SUCCESS); hg_id_t id0 = MARGO_REGISTER_PROVIDER(mid, "rpc_0", void, void, rpc_ult, 42, pool_info.pool); // Get my_pool_1 and register an RPC handler with it - ret = margo_find_pool_by_name(mid, "my_pool_1", &pool_info); + ret = margo_find_pool(mid, "my_pool_1", &pool_info); munit_assert_int(ret, ==, HG_SUCCESS); hg_id_t id1 = MARGO_REGISTER_PROVIDER(mid, "rpc_1", void, void, rpc_ult, 42, pool_info.pool); int num_pools = margo_get_num_pools(mid); - munit_assert_int(num_pools, ==, 6); + munit_assert_int(num_pools, ==, 7); // failing case: removing by invalid index - ret = margo_remove_pool_by_index(mid, num_pools); + ret = margo_remove_pool(mid, num_pools); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing by invalid name - ret = margo_remove_pool_by_name(mid, "invalid"); + ret = margo_remove_pool(mid, "invalid"); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing by invalid ABT_pool - ret = margo_remove_pool_by_handle(mid, (ABT_pool)(0x1234)); + ret = margo_remove_pool(mid, (ABT_pool)(0x1234)); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing the primary ES's pool - ret = margo_remove_pool_by_name(mid, "__primary__"); + ret = margo_remove_pool(mid, "__primary__"); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing a pool that is still in use by some ES - ret = margo_remove_pool_by_name(mid, "__pool_1__"); + ret = margo_remove_pool(mid, "__pool_1__"); munit_assert_int(ret, !=, HG_SUCCESS); // check that we can access my_pool_1 - ret = margo_find_pool_by_name(mid, "my_pool_1", &pool_info); + ret = margo_find_pool(mid, "my_pool_1", &pool_info); munit_assert_int(ret, ==, HG_SUCCESS); // failing case: removing my_pool_1 not allowed because rpc_1 registered with it - ret = margo_remove_pool_by_name(mid, "my_pool_1"); + ret = margo_remove_pool(mid, "my_pool_1"); munit_assert_int(ret, !=, HG_SUCCESS); // deregister rpc_1 should make it possible to then remove my_pool_1 margo_deregister(mid, id1); + // unless we refincr it before :-) + ret = margo_pool_ref_incr(mid, "my_pool_1"); + munit_assert_int(ret, ==, HG_SUCCESS); + + // check the ref count + unsigned refcount = 1234; + ret = margo_pool_ref_count(mid, "my_pool_1", &refcount); + munit_assert_int(ret, ==, HG_SUCCESS); + munit_assert_int(refcount, ==, 1); + + // failing case: remove my_pool_1 by name + ret = margo_remove_pool(mid, "my_pool_1"); + munit_assert_int(ret, ==, HG_PERMISSION); + + // decref the pool + ret = margo_pool_release(mid, "my_pool_1"); + munit_assert_int(ret, ==, HG_SUCCESS); + // remove my_pool_1 by name - ret = margo_remove_pool_by_name(mid, "my_pool_1"); + ret = margo_remove_pool(mid, "my_pool_1"); munit_assert_int(ret, ==, HG_SUCCESS); // check the number of pools again num_pools = margo_get_num_pools(mid); - munit_assert_int(num_pools, ==, 5); + munit_assert_int(num_pools, ==, 6); // check that my_pool_1 is no longer present - ret = margo_find_pool_by_name(mid, "my_pool_1", &pool_info); + ret = margo_find_pool(mid, "my_pool_1", &pool_info); munit_assert_int(ret, !=, HG_SUCCESS); // check that we can access my_pool_2 - ret = margo_find_pool_by_name(mid, "my_pool_2", &pool_info); + ret = margo_find_pool(mid, "my_pool_2", &pool_info); munit_assert_int(ret, ==, HG_SUCCESS); // failing case: put a ULT in my_pool_2 and try to remove the pool @@ -253,37 +271,83 @@ static MunitResult remove_pool(const MunitParameter params[], void* data) // going to start executing, so we then need to transfer the content // of my_pool_2 into a pool that will actually be able to run the work. ABT_thread_create(pool_info.pool, my_ult, NULL, ABT_THREAD_ATTR_NULL, NULL); - ret = margo_remove_pool_by_index(mid, pool_info.index); - munit_assert_int(ret, !=, HG_SUCCESS); + ret = margo_remove_pool(mid, pool_info.index); + munit_assert_int(ret, ==, HG_PERMISSION); + ret = margo_transfer_pool_content(pool_info.pool, handler_pool); munit_assert_int(ret, ==, HG_SUCCESS); // remove my_pool_2 by index - ret = margo_remove_pool_by_index(mid, pool_info.index); + ret = margo_remove_pool(mid, pool_info.index); munit_assert_int(ret, ==, HG_SUCCESS); - // check the number of xstreams again + // check the number of pools again num_pools = margo_get_num_pools(mid); - munit_assert_int(num_pools, ==, 4); + munit_assert_int(num_pools, ==, 5); // check that my_pool_2 is no longer present - ret = margo_find_pool_by_name(mid, "my_pool_2", &pool_info); + ret = margo_find_pool(mid, "my_pool_2", &pool_info); + munit_assert_int(ret, !=, HG_SUCCESS); + + // check that we can access my_pool_3 + ret = margo_find_pool(mid, "my_pool_3", &pool_info); + munit_assert_int(ret, ==, HG_SUCCESS); + + // increment the refcount of my_pool_3 by index + ret = margo_pool_ref_incr(mid, pool_info.index); + munit_assert_int(ret, ==, HG_SUCCESS); + + // check the ref count + ret = margo_pool_ref_count(mid, pool_info.index, &refcount); + munit_assert_int(ret, ==, HG_SUCCESS); + munit_assert_int(refcount, ==, 1); + + // failing case: remove my_pool_3 by index + ret = margo_remove_pool(mid, pool_info.index); + munit_assert_int(ret, ==, HG_PERMISSION); + + // decrement the refcount of my_pool_3 by index + ret = margo_pool_release(mid, pool_info.index); + munit_assert_int(ret, ==, HG_SUCCESS); + + // remove my_pool_3 by index + ret = margo_remove_pool(mid, pool_info.index); + munit_assert_int(ret, ==, HG_SUCCESS); + + // check the number of pools again + num_pools = margo_get_num_pools(mid); + munit_assert_int(num_pools, ==, 4); + + // check that my_pool_3 is no longer present + ret = margo_find_pool(mid, "my_pool_3", &pool_info); munit_assert_int(ret, !=, HG_SUCCESS); // check that we can access my_pool_0 - ret = margo_find_pool_by_name(mid, "my_pool_0", &pool_info); + ret = margo_find_pool(mid, "my_pool_0", &pool_info); munit_assert_int(ret, ==, HG_SUCCESS); // failing case: cannot removing my_pool_0 because it is used by rpc_0 - ret = margo_remove_pool_by_handle(mid, pool_info.pool); + ret = margo_remove_pool(mid, pool_info.pool); munit_assert_int(ret, !=, HG_SUCCESS); // move rpc_0 to another pool (__pool_1__, which is the default handler pool) // so we can remove my_pool_0 margo_rpc_set_pool(mid, id0, handler_pool); + // increase the refcount + ret = margo_pool_ref_incr(mid, pool_info.pool); + munit_assert_int(ret, ==, HG_SUCCESS); + + // failing: remove it by handle + ret = margo_remove_pool(mid, pool_info.pool); + munit_assert_int(ret, ==, HG_PERMISSION); + + // decrease the refcount + ret = margo_pool_release(mid, pool_info.pool); + munit_assert_int(ret, ==, HG_SUCCESS); + // remove it by handle - ret = margo_remove_pool_by_handle(mid, pool_info.pool); + ret = margo_remove_pool(mid, pool_info.pool); munit_assert_int(ret, ==, HG_SUCCESS); // check the number of pools again @@ -291,7 +355,7 @@ static MunitResult remove_pool(const MunitParameter params[], void* data) munit_assert_int(num_pools, ==, 3); // check that my_pool_0 is no longer present - ret = margo_find_pool_by_name(mid, "my_pool_0", &pool_info); + ret = margo_find_pool(mid, "my_pool_0", &pool_info); munit_assert_int(ret, !=, HG_SUCCESS); margo_finalize(mid); @@ -318,7 +382,7 @@ static MunitResult add_xstream_from_json(const MunitParameter params[], void* da // search for it by index struct margo_xstream_info xstream_info2 = {0}; - ret = margo_find_xstream_by_index(mid, xstream_info.index, &xstream_info2); + ret = margo_find_xstream(mid, xstream_info.index, &xstream_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(xstream_info2.index, ==, xstream_info.index); munit_assert_string_equal(xstream_info2.name, xstream_info.name); @@ -326,7 +390,7 @@ static MunitResult add_xstream_from_json(const MunitParameter params[], void* da // search for it by name memset(&xstream_info2, 0, sizeof(xstream_info2)); - ret = margo_find_xstream_by_name(mid, xstream_info.name, &xstream_info2); + ret = margo_find_xstream(mid, xstream_info.name, &xstream_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(xstream_info2.index, ==, xstream_info.index); munit_assert_string_equal(xstream_info2.name, xstream_info.name); @@ -334,7 +398,7 @@ static MunitResult add_xstream_from_json(const MunitParameter params[], void* da // search for it by handle memset(&xstream_info2, 0, sizeof(xstream_info2)); - ret = margo_find_xstream_by_handle(mid, xstream_info.xstream, &xstream_info2); + ret = margo_find_xstream(mid, xstream_info.xstream, &xstream_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(xstream_info2.index, ==, xstream_info.index); munit_assert_string_equal(xstream_info2.name, xstream_info.name); @@ -374,7 +438,7 @@ static MunitResult add_xstream_external(const MunitParameter params[], void* dat ABT_pool known_pools[3]; for(int i = 0; i < 3; i++) { struct margo_pool_info pool_info = {0}; - ret = margo_find_pool_by_index(mid, i, &pool_info); + ret = margo_find_pool(mid, i, &pool_info); munit_assert_int(ret, ==, HG_SUCCESS); known_pools[i] = pool_info.pool; } @@ -394,7 +458,7 @@ static MunitResult add_xstream_external(const MunitParameter params[], void* dat // search for it by index struct margo_xstream_info xstream_info2 = {0}; - ret = margo_find_xstream_by_index(mid, xstream_info.index, &xstream_info2); + ret = margo_find_xstream(mid, xstream_info.index, &xstream_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(xstream_info2.index, ==, xstream_info.index); munit_assert_string_equal(xstream_info2.name, xstream_info.name); @@ -402,7 +466,7 @@ static MunitResult add_xstream_external(const MunitParameter params[], void* dat // search for it by name memset(&xstream_info2, 0, sizeof(xstream_info2)); - ret = margo_find_xstream_by_name(mid, xstream_info.name, &xstream_info2); + ret = margo_find_xstream(mid, xstream_info.name, &xstream_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(xstream_info2.index, ==, xstream_info.index); munit_assert_string_equal(xstream_info2.name, xstream_info.name); @@ -410,7 +474,7 @@ static MunitResult add_xstream_external(const MunitParameter params[], void* dat // search for it by handle memset(&xstream_info2, 0, sizeof(xstream_info2)); - ret = margo_find_xstream_by_handle(mid, xstream_info.xstream, &xstream_info2); + ret = margo_find_xstream(mid, xstream_info.xstream, &xstream_info2); munit_assert_int(ret, ==, HG_SUCCESS); munit_assert_int(xstream_info2.index, ==, xstream_info.index); munit_assert_string_equal(xstream_info2.name, xstream_info.name); @@ -469,28 +533,46 @@ static MunitResult remove_xstream(const MunitParameter params[], void* data) munit_assert_int(num_xstreams, ==, 6); // failing case: removing by invalid index - ret = margo_remove_xstream_by_index(mid, num_xstreams); + ret = margo_remove_xstream(mid, num_xstreams); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing by invalid name - ret = margo_remove_xstream_by_name(mid, "invalid"); + ret = margo_remove_xstream(mid, "invalid"); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing by invalid ABT_xstream - ret = margo_remove_xstream_by_handle(mid, (ABT_xstream)(0x1234)); + ret = margo_remove_xstream(mid, (ABT_xstream)(0x1234)); munit_assert_int(ret, !=, HG_SUCCESS); // failing case: removing the primary ES - ret = margo_remove_xstream_by_name(mid, "__primary__"); + ret = margo_remove_xstream(mid, "__primary__"); munit_assert_int(ret, !=, HG_SUCCESS); // check that we can access __xstream_2__ struct margo_xstream_info xstream_info = {0}; - ret = margo_find_xstream_by_name(mid, "__xstream_2__", &xstream_info); + ret = margo_find_xstream(mid, "__xstream_2__", &xstream_info); + munit_assert_int(ret, ==, HG_SUCCESS); + + // increment refcount for __xstream_2__ by name + ret = margo_xstream_ref_incr(mid, "__xstream_2__"); + munit_assert_int(ret, ==, HG_SUCCESS); + + // get the refcount + unsigned refcount = 1234; + ret = margo_xstream_ref_count(mid, "__xstream_2__", &refcount); + munit_assert_int(ret, ==, HG_SUCCESS); + munit_assert_int(refcount, ==, 1); + + // failing case: remove __xstream_2__ by name (refcount is not 0) + ret = margo_remove_xstream(mid, "__xstream_2__"); + munit_assert_int(ret, ==, HG_PERMISSION); + + // decrement refcount for __xstream_2__ + ret = margo_xstream_release(mid, "__xstream_2__"); munit_assert_int(ret, ==, HG_SUCCESS); // remove __xstream_2__ by name - ret = margo_remove_xstream_by_name(mid, "__xstream_2__"); + ret = margo_remove_xstream(mid, "__xstream_2__"); munit_assert_int(ret, ==, HG_SUCCESS); // check the number of xstreams again @@ -498,15 +580,32 @@ static MunitResult remove_xstream(const MunitParameter params[], void* data) munit_assert_int(num_xstreams, ==, 5); // check that __xstream_2__ is no longer present - ret = margo_find_xstream_by_name(mid, "__xstream_2__", &xstream_info); + ret = margo_find_xstream(mid, "__xstream_2__", &xstream_info); munit_assert_int(ret, !=, HG_SUCCESS); // check that we can access __xstream_4__ - ret = margo_find_xstream_by_name(mid, "__xstream_4__", &xstream_info); + ret = margo_find_xstream(mid, "__xstream_4__", &xstream_info); + munit_assert_int(ret, ==, HG_SUCCESS); + + // increment the refcount by index + ret = margo_xstream_ref_incr(mid, xstream_info.index); + munit_assert_int(ret, ==, HG_SUCCESS); + + // get the refcount by index + ret = margo_xstream_ref_count(mid, xstream_info.index, &refcount); + munit_assert_int(ret, ==, HG_SUCCESS); + munit_assert_int(refcount, ==, 1); + + // failing case: remove __xstream_4__ by index + ret = margo_remove_xstream(mid, xstream_info.index); + munit_assert_int(ret, ==, HG_PERMISSION); + + // decrement the refcount by index + ret = margo_xstream_release(mid, xstream_info.index); munit_assert_int(ret, ==, HG_SUCCESS); // remove __xstream_4__ by index - ret = margo_remove_xstream_by_index(mid, xstream_info.index); + ret = margo_remove_xstream(mid, xstream_info.index); munit_assert_int(ret, ==, HG_SUCCESS); // check the number of xstreams again @@ -514,15 +613,32 @@ static MunitResult remove_xstream(const MunitParameter params[], void* data) munit_assert_int(num_xstreams, ==, 4); // check that __xstream_4__ is no longer present - ret = margo_find_xstream_by_name(mid, "__xstream_4__", &xstream_info); + ret = margo_find_xstream(mid, "__xstream_4__", &xstream_info); munit_assert_int(ret, !=, HG_SUCCESS); // check that we can access __xstream_3__ - ret = margo_find_xstream_by_name(mid, "__xstream_3__", &xstream_info); + ret = margo_find_xstream(mid, "__xstream_3__", &xstream_info); + munit_assert_int(ret, ==, HG_SUCCESS); + + // increment the refcount + ret = margo_xstream_ref_incr(mid, xstream_info.xstream); + munit_assert_int(ret, ==, HG_SUCCESS); + + // get the refcount + ret = margo_xstream_ref_count(mid, xstream_info.xstream, &refcount); + munit_assert_int(ret, ==, HG_SUCCESS); + munit_assert_int(refcount, ==, 1); + + // failing case: remove it by handle + ret = margo_remove_xstream(mid, xstream_info.xstream); + munit_assert_int(ret, ==, HG_PERMISSION); + + // decrement the refcount + ret = margo_xstream_release(mid, xstream_info.xstream); munit_assert_int(ret, ==, HG_SUCCESS); // remove it by handle - ret = margo_remove_xstream_by_handle(mid, xstream_info.xstream); + ret = margo_remove_xstream(mid, xstream_info.xstream); munit_assert_int(ret, ==, HG_SUCCESS); // check the number of xstreams again @@ -530,7 +646,7 @@ static MunitResult remove_xstream(const MunitParameter params[], void* data) munit_assert_int(num_xstreams, ==, 3); // check that __xstream_3__ is no longer present - ret = margo_find_xstream_by_name(mid, "__xstream_3__", &xstream_info); + ret = margo_find_xstream(mid, "__xstream_3__", &xstream_info); munit_assert_int(ret, !=, HG_SUCCESS); margo_finalize(mid); diff --git a/tests/unit-tests/margo-init.c b/tests/unit-tests/margo-init.c index dee606f..7873146 100644 --- a/tests/unit-tests/margo-init.c +++ b/tests/unit-tests/margo-init.c @@ -154,14 +154,14 @@ static MunitResult ref_incr_and_release(const MunitParameter params[], void* dat unsigned refcount = 1234; hg_return_t hret = margo_instance_ref_count(ctx->mid, &refcount); munit_assert_int(hret, ==, HG_SUCCESS); - munit_assert_int(refcount, ==, 0); + munit_assert_int(refcount, ==, 1); hret = margo_instance_ref_incr(ctx->mid); munit_assert_int(hret, ==, HG_SUCCESS); hret = margo_instance_ref_count(ctx->mid, &refcount); munit_assert_int(hret, ==, HG_SUCCESS); - munit_assert_int(refcount, ==, 1); + munit_assert_int(refcount, ==, 2); bool is_finalized = true; hret = margo_instance_is_finalized(ctx->mid, &is_finalized);