Skip to content

Commit

Permalink
Add mutex lock/unlock when iterating file_task_list_head
Browse files Browse the repository at this point in the history
  • Loading branch information
houjun committed Jul 17, 2023
1 parent aad1bef commit 77a43d7
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions src/h5_async_vol.c
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ func_log_uint64_1(const char *func, const char *name, uint64_t val)
gettimeofday(&now, NULL);

if (async_instance_g && (async_instance_g->mpi_rank == ASYNC_DBG_MSG_RANK || -1 == ASYNC_DBG_MSG_RANK))
fprintf(fout_g, " [%s DBG] %ld.%06ld: [%s], push=%d, %s: %llu\n", type, now.tv_sec, now.tv_usec,
fprintf(fout_g, " [%s DBG] %ld.%06ld: [%s], push=%d, %s: %lu\n", type, now.tv_sec, now.tv_usec,
func, async_instance_g->start_abt_push, name, val);
#endif
return;
Expand Down Expand Up @@ -2298,6 +2298,12 @@ get_n_running_task_in_queue(async_task_t *task, const char *call_func)
return -1;
}

if (task->async_obj->file_task_list_mutex &&
ABT_mutex_lock(task->async_obj->file_task_list_mutex) != ABT_SUCCESS) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_lock\n", __func__);
return -1;
}

if (ABT_pool_get_total_size(*(task->async_obj->pool_ptr), &pool_size) != ABT_SUCCESS)
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_pool_get_total_size\n", __func__);

Expand All @@ -2320,6 +2326,11 @@ get_n_running_task_in_queue(async_task_t *task, const char *call_func)
}
}

if (ABT_mutex_unlock(task->async_obj->file_async_obj->file_task_list_mutex) != ABT_SUCCESS) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__);
return -1;
}

if (ABT_mutex_unlock(async_instance_g->qhead.head_mutex) != ABT_SUCCESS) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__);
return -1;
Expand Down Expand Up @@ -2367,6 +2378,11 @@ get_n_running_task_in_queue_obj(H5VL_async_t *async_obj, const char *call_func)
if (async_obj->pool_ptr && ABT_pool_get_total_size(*(async_obj->pool_ptr), &pool_size) != ABT_SUCCESS)
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_pool_get_total_size\n", __func__);

if (async_obj->file_task_list_mutex && ABT_mutex_lock(async_obj->file_task_list_mutex) != ABT_SUCCESS) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_lock\n", __func__);
return -1;
}

if (pool_size == 0) {
if (ABT_thread_self(&self_thread) != ABT_SUCCESS)
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_thread_self\n", __func__);
Expand All @@ -2382,6 +2398,11 @@ get_n_running_task_in_queue_obj(H5VL_async_t *async_obj, const char *call_func)
}
}

if (ABT_mutex_unlock(async_obj->file_async_obj->file_task_list_mutex) != ABT_SUCCESS) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__);
return -1;
}

if (ABT_mutex_unlock(async_instance_g->qhead.head_mutex) != ABT_SUCCESS) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__);
return -1;
Expand Down Expand Up @@ -5857,13 +5878,13 @@ async_attr_open(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_loc_
{
H5VL_async_t * async_obj = NULL;
async_task_t * async_task = NULL;
async_task_t * task_elt = NULL;
async_task_t * create_task = NULL;
/* async_task_t * task_elt = NULL; */
/* async_task_t * create_task = NULL; */
async_attr_open_args_t *args = NULL;
bool lock_parent = false;
bool is_blocking = false;
hbool_t acquired = false;
hbool_t found_create = false;
/* hbool_t found_create = false; */
unsigned int mutex_count = 1;

func_enter(__func__, name);
Expand Down

0 comments on commit 77a43d7

Please sign in to comment.