Skip to content

Commit

Permalink
Better object lifetime management in margo (#285)
Browse files Browse the repository at this point in the history
* added proper refcounting of pools and xstreams managed by margo

* changed finalization of argobots and mercury

* changed some function names

* better cleanup

* modified some comments and removed deprecated functions

* added functions to get the refcount

* fixed refcount once and for all?

* fixed margo_xstream_ref_count and added more tests

* fix refcounting in margo_instance_release
  • Loading branch information
mdorier authored Aug 22, 2024
1 parent f61bf8a commit a840d90
Show file tree
Hide file tree
Showing 9 changed files with 986 additions and 277 deletions.
483 changes: 410 additions & 73 deletions include/margo-config.h

Large diffs are not rendered by default.

29 changes: 15 additions & 14 deletions src/margo-abt-config.c
Original file line number Diff line number Diff line change
Expand Up @@ -1418,27 +1418,27 @@ bool __margo_abt_add_external_xstream(margo_abt_t* abt,
return ret;
}

bool __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index)
hg_return_t __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index)
{
if (index >= abt->pools_len) {
margo_error(abt->mid, "Invalid index %u in __margo_abt_remove_pool",
index);
return false;
return HG_INVALID_ARG;
}
margo_abt_pool_t* pool = &(abt->pools[index]);
if (pool->num_rpc_ids) {
if (pool->refcount) {
margo_error(abt->mid,
"Cannot remove pool %s at index %u "
"because it is used by %u RPC handlers",
pool->name, index, pool->num_rpc_ids);
return false;
"because it is used",
pool->name, index);
return HG_PERMISSION;
}
if (pool->num_xstreams) {
margo_error(abt->mid,
"Cannot remove pool %s at index %u "
"because it is used by %u running xstreams",
pool->name, index, pool->num_xstreams);
return false;
return HG_PERMISSION;
}
size_t pool_size = 0;
int ret = ABT_pool_get_total_size(pool->pool, &pool_size);
Expand All @@ -1448,36 +1448,37 @@ bool __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index)
" (ABT_pool_get_total_size returned %d)"
" in __margo_abt_pool_destroy",
ret);
return false;
return HG_OTHER_ERROR;
} else if (pool_size != 0) {
margo_error(0, "Destroying a pool (%s) that is not empty", pool->name);
return false;
return HG_PERMISSION;
}
__margo_abt_pool_destroy(pool, abt);
margo_abt_pool_t* last_pool = &(abt->pools[abt->pools_len - 1]);
if (index != abt->pools_len - 1) memcpy(pool, last_pool, sizeof(*pool));
abt->pools_len -= 1;
memset(last_pool, 0, sizeof(*last_pool));
return true;
return HG_SUCCESS;
}

bool __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index)
hg_return_t __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index)
{
if (index >= abt->xstreams_len) {
margo_error(abt->mid, "Invalid index %u in __margo_abt_remove_xstream",
index);
return false;
return HG_INVALID_ARG;
}
margo_abt_xstream_t* xstream = &(abt->xstreams[index]);
if (strcmp(xstream->name, "__primary__") == 0) {
margo_error(abt->mid, "Cannot remove primary xstream");
return false;
return HG_PERMISSION;
}
if (xstream->refcount) { return HG_PERMISSION; }
__margo_abt_xstream_destroy(xstream, abt);
margo_abt_xstream_t* last_xstream = &(abt->xstreams[abt->xstreams_len - 1]);
if (index != abt->xstreams_len - 1)
memcpy(xstream, last_xstream, sizeof(*xstream));
abt->xstreams_len -= 1;
memset(last_xstream, 0, sizeof(*last_xstream));
return true;
return HG_SUCCESS;
}
20 changes: 11 additions & 9 deletions src/margo-abt-config.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ typedef struct margo_abt margo_abt_t;
* margo is responsible for explicitly free'ing the pool or not.
*/
typedef struct margo_abt_pool {
char* name;
ABT_pool pool;
char* kind;
optional_char* access; /* Unknown for custom user pools */
_Atomic(uint32_t) num_rpc_ids; /* Number of RPC ids that use this pool */
char* name;
ABT_pool pool;
char* kind;
optional_char* access; /* Unknown for custom user pools */
_Atomic(uint32_t)
refcount; /* Number of RPC ids and external uses of this pool */
_Atomic(uint32_t) num_xstreams; /* Number of xstreams that use this pool */
bool margo_free_flag; /* flag if Margo is responsible for freeing */
bool used_by_primary; /* flag indicating the this pool is used by the
Expand Down Expand Up @@ -108,8 +109,9 @@ void __margo_abt_sched_destroy(margo_abt_sched_t* sched);
* margo is responsible for explicitly free'ing the ES or not.
*/
typedef struct margo_abt_xstream {
char* name;
ABT_xstream xstream;
char* name;
ABT_xstream xstream;
_Atomic(uint32_t) refcount; /* Number of external use this xstream */
struct margo_abt_sched sched;
bool margo_free_flag; /* flag if Margo is responsible for freeing */
} margo_abt_xstream_t;
Expand Down Expand Up @@ -181,6 +183,6 @@ bool __margo_abt_add_external_pool(margo_abt_t* abt,
bool __margo_abt_add_external_xstream(margo_abt_t* abt,
const char* name,
ABT_xstream xstream);
bool __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index);
bool __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index);
hg_return_t __margo_abt_remove_pool(margo_abt_t* abt, uint32_t index);
hg_return_t __margo_abt_remove_xstream(margo_abt_t* abt, uint32_t index);
#endif
Loading

0 comments on commit a840d90

Please sign in to comment.