Skip to content

Commit

Permalink
Add more debugging output, some minor bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
houjun committed Jul 17, 2023
1 parent 6795905 commit 72c70f9
Showing 1 changed file with 64 additions and 28 deletions.
92 changes: 64 additions & 28 deletions src/h5_async_vol.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ works, and perform publicly and display publicly, and to permit others to do so.
/* #define PRINT_ERROR_STACK 1 */
/* #define ENABLE_ASYNC_LOGGING */

#define ASYNC_DBG_MSG_RANK 0
#define ASYNC_DBG_MSG_RANK -1

/* Record timing information */
#define ENABLE_TIMING 1
Expand Down Expand Up @@ -1047,6 +1047,23 @@ func_log_int1(const char *func, const char *name, int val)
return;
}

static inline void
func_log_uint64_1(const char *func, const char *name, uint64_t val)
{
#ifdef ENABLE_DBG_MSG
const char *type = "ASYNC VOL";
if (strstr(func, "_fn"))
type = " ABT";
struct timeval now;
gettimeofday(&now, NULL);

if (async_instance_g && (async_instance_g->mpi_rank == ASYNC_DBG_MSG_RANK || -1 == ASYNC_DBG_MSG_RANK))
fprintf(fout_g, " [%s DBG] %ld.%06ld: [%s], push=%d, %s: %llu\n", type, now.tv_sec, now.tv_usec, func,
async_instance_g->start_abt_push, name, val);
#endif
return;
}

/** @defgroup ASYNC
* This group is for async VOL functionalities.
*/
Expand Down Expand Up @@ -2585,7 +2602,7 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func

done:
// Remove head if all its tasks have been pushed to Argobots pool
if (qhead->queue && qhead->queue->task_list == NULL) {
if (qhead->queue && qhead->queue->task_list == NULL && qhead->queue->next == qhead->queue) {
DL_DELETE(qhead->queue, qhead->queue);
func_log(__func__, "removed empty queue task list");
}
Expand Down Expand Up @@ -9021,15 +9038,15 @@ async_dataset_read_fn(void *foo)
if (args->file_space_id[i] > H5S_PLIST && args->file_space_id[i] < H5S_UNLIMITED)
H5Sclose(args->file_space_id[i]);
}
if (args->mem_type_id > 0) {
if (args->mem_type_id) {
free(args->mem_type_id);
args->mem_type_id = NULL;
}
if (args->mem_space_id > 0) {
if (args->mem_space_id) {
free(args->mem_space_id);
args->mem_space_id = NULL;
}
if (args->file_space_id > 0) {
if (args->file_space_id) {
free(args->file_space_id);
args->file_space_id = NULL;
}
Expand Down Expand Up @@ -9230,7 +9247,6 @@ async_dataset_read(async_instance_t *aid, size_t count, H5VL_async_t **parent_ob
}
}
else {
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
if (xfer_mode == H5FD_MPIO_COLLECTIVE)
add_task_to_queue(&aid->qhead, async_task, COLLECTIVE);
Expand Down Expand Up @@ -9725,7 +9741,6 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty
}
}
else {
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
H5FD_mpio_xfer_t xfer_mode;
H5Pget_dxpl_mpio(plist_id, &xfer_mode);
Expand Down Expand Up @@ -9807,23 +9822,40 @@ is_contig_memspace(hid_t memspace)
H5S_sel_type type;

if (memspace == H5S_SEL_ALL || memspace == H5S_SEL_NONE) {
func_log(__func__, "memspace is SEL_ALL/NONE, is contig");
return 1;
}

type = H5Sget_select_type(memspace);
if (type == H5S_SEL_POINTS) {
func_log(__func__, "memspace is SEL_POINTS, not contig");
return 0;
}
else if (type == H5S_SEL_HYPERSLABS) {
ndim = H5Sget_simple_extent_ndims(memspace);
if (ndim != 1)
if (ndim != 1) {
func_log(__func__, "memspace dim > 1, not contig");
return 0;
}

nblocks = H5Sget_select_hyper_nblocks(memspace);
if (nblocks == 1)
if (nblocks == 1) {
func_log(__func__, "nblock = 1, is contig");
return 1;
}
func_log(__func__, "nblock > 1, not contig");
return 0;
}
else if (type == H5S_SEL_ALL || type == H5S_SEL_NONE) {
func_log(__func__, "memspace is SEL_ALL/NONE, is contig");
return 1;
}
else {
func_log_int1(__func__, "unexpected memspace sel type", (int)type);
}

func_log_int1(__func__, "unknown memspace sel type", (int)type);

return 0;
}
#endif
Expand Down Expand Up @@ -9969,15 +10001,15 @@ async_dataset_write_fn(void *foo)
if (args->file_space_id[i] > H5S_PLIST && args->file_space_id[i] < H5S_UNLIMITED)
H5Sclose(args->file_space_id[i]);
}
if (args->mem_type_id > 0) {
if (args->mem_type_id) {
free(args->mem_type_id);
args->mem_type_id = NULL;
}
if (args->mem_space_id > 0) {
if (args->mem_space_id) {
free(args->mem_space_id);
args->mem_space_id = NULL;
}
if (args->file_space_id > 0) {
if (args->file_space_id) {
free(args->file_space_id);
args->file_space_id = NULL;
}
Expand All @@ -9989,8 +10021,6 @@ async_dataset_write_fn(void *foo)
free(args->dset);
args->dset = NULL;
}
free(args);
task->args = NULL;

if (is_lock == 1) {
if (ABT_mutex_unlock(task->async_obj->obj_mutex) != ABT_SUCCESS)
Expand Down Expand Up @@ -10026,6 +10056,8 @@ async_dataset_write_fn(void *foo)
free(args->buf);
args->buf = NULL;
}
free(args);
task->args = NULL;

#ifdef ENABLE_TIMING
task->end_time = clock();
Expand Down Expand Up @@ -10110,22 +10142,24 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o
#ifdef ENABLE_WRITE_MEMCPY
hsize_t buf_size = 0;
for (size_t i = 0; i < count; i++) {
func_log_int1(__func__, "start write memcpy iter", i);
if (parent_obj[i]->data_size > 0 &&
(args->file_space_id[i] == H5S_ALL || args->mem_space_id[i] == H5S_ALL)) {
buf_size = parent_obj[i]->data_size;
}
else {
buf_size = H5Tget_size(mem_type_id[i]) * H5Sget_select_npoints(mem_space_id[i]);
#ifdef ENABLE_DBG_MSG
if (buf_size == 0)
fprintf(fout_g, " [ASYNC VOL ERROR] %s with getting dataset size\n", __func__);
#endif
fprintf(fout_g, " [ASYNC VOL INFO] %s dataset size is 0\n", __func__);
}

/* fprintf(fout_g, "buf size = %llu\n", buf_size); */

// Get available system memory
hsize_t avail_mem = (hsize_t)get_avphys_pages() * sysconf(_SC_PAGESIZE);
func_log_uint64_1(__func__, "system available memory bytes", avail_mem);
func_log_uint64_1(__func__, "vol-async used memory bytes", async_instance_g->used_mem);
func_log_uint64_1(__func__, "vol-async max memory bytes", async_instance_g->max_mem);

if (async_instance_g->used_mem + buf_size > async_instance_g->max_mem) {
is_blocking = true;
Expand All @@ -10142,6 +10176,7 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o
async_instance_g->mpi_rank, buf_size, avail_mem);
}
else if (buf_size > 0) {
func_log_uint64_1(__func__, "allocate double memory bytes", buf_size);
if (NULL == (args->buf[i] = malloc(buf_size))) {
fprintf(fout_g, " [ASYNC VOL ERROR] %s malloc failed!\n", __func__);
goto done;
Expand All @@ -10152,8 +10187,10 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o

// If is contiguous space, no need to go through gather process as it can be costly
if (1 != is_contig_memspace(mem_space_id[i])) {
/* fprintf(fout_g," [ASYNC VOL LOG] %s will gather!\n", __func__); */
H5Dgather(mem_space_id[i], buf[i], mem_type_id[i], buf_size, args->buf[i], NULL, NULL);
func_log(__func__, "memspace is not contiguous, need H5Dgather");
if (H5Dgather(mem_space_id[i], buf[i], mem_type_id[i], buf_size, args->buf[i], NULL, NULL) < 0)
fprintf(fout_g, " [ASYNC VOL ERROR] %s H5Dgather failed!\n", __func__);

hsize_t elem_size = H5Tget_size(mem_type_id[i]);
if (elem_size == 0)
elem_size = 1;
Expand All @@ -10163,9 +10200,14 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o
args->mem_space_id[i] = H5Screate_simple(1, &n_elem, NULL);
}
else {
func_log(__func__, "memspace contiguous, memcpy directly");
memcpy(args->buf[i], buf[i], buf_size);
}
}
else {
func_log(__func__, "write size is 0, no memcpy");
}
func_log_int1(__func__, "end write memcpy iter", i);
}
#endif

Expand All @@ -10180,7 +10222,7 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o
new_req->file_async_obj = parent_obj[0]->file_async_obj;
*req = (void *)new_req;
}
// Allow implicit mode to do async dset write
// Comment out the code below to allow implicit mode async dset write
/* else { */
/* is_blocking = true; */
/* async_instance_g->start_abt_push = true; */
Expand Down Expand Up @@ -10235,15 +10277,13 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o
parent_obj[0]->task_cnt++;
parent_obj[0]->pool_ptr = &aid->pool;

/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
H5Pget_dxpl_mpio(plist_id, &xfer_mode);
#endif

/* Check if its parent has valid object */
if (NULL == parent_obj[0]->under_object) {
if (NULL != parent_obj[0]->create_task) {
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
if (xfer_mode == H5FD_MPIO_COLLECTIVE)
add_task_to_queue(&aid->qhead, async_task, COLLECTIVE);
Expand All @@ -10257,7 +10297,6 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o
}
}
else {
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
if (xfer_mode == H5FD_MPIO_COLLECTIVE)
add_task_to_queue(&aid->qhead, async_task, COLLECTIVE);
Expand Down Expand Up @@ -10677,8 +10716,6 @@ async_dataset_write_fn(void *foo)
H5Sclose(args->file_space_id);
if (args->plist_id > 0)
H5Pclose(args->plist_id);
free(args);
task->args = NULL;

if (is_lock == 1) {
if (ABT_mutex_unlock(task->async_obj->obj_mutex) != ABT_SUCCESS)
Expand Down Expand Up @@ -10708,6 +10745,8 @@ async_dataset_write_fn(void *foo)
#endif
}
#endif
free(args);
task->args = NULL;

#ifdef ENABLE_TIMING
task->end_time = clock();
Expand Down Expand Up @@ -10898,7 +10937,6 @@ async_dataset_write(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_t
}
}
else {
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
H5FD_mpio_xfer_t xfer_mode;
H5Pget_dxpl_mpio(plist_id, &xfer_mode);
Expand Down Expand Up @@ -14666,7 +14704,6 @@ async_file_create(async_instance_t *aid, const char *name, unsigned flags, hid_t
}
async_obj->task_cnt++;
async_obj->pool_ptr = &aid->pool;
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
H5Pget_coll_metadata_write(fapl_id, &async_obj->is_col_meta);
#endif
Expand Down Expand Up @@ -15019,7 +15056,6 @@ async_file_open(task_list_qtype qtype, async_instance_t *aid, const char *name,
}
async_obj->task_cnt++;
async_obj->pool_ptr = &aid->pool;
/* #ifndef ASYNC_VOL_NO_MPI */
#ifdef MPI_VERSION
H5Pget_coll_metadata_write(fapl_id, &async_obj->is_col_meta);
#endif
Expand Down

0 comments on commit 72c70f9

Please sign in to comment.