Skip to content

Commit

Permalink
HG: add HG_Diag_dump_counters() to dump diagnostic counters
Browse files Browse the repository at this point in the history
Add rpc_req_recv_active_count and rpc_multi_recv_copy_count counters
  • Loading branch information
soumagne committed Aug 22, 2024
1 parent 2635745 commit c6a92e5
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 7 deletions.
9 changes: 9 additions & 0 deletions src/mercury.c
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,15 @@ HG_Set_log_stream(const char *level, FILE *stream)
}
}

/*---------------------------------------------------------------------------*/
void
HG_Diag_dump_counters(void)
{
#ifndef _WIN32
hg_log_dump_counters(&HG_LOG_OUTLET(hg_diag));
#endif
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Class_set_handle_create_callback(hg_class_t *hg_class,
Expand Down
6 changes: 6 additions & 0 deletions src/mercury.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ HG_Set_log_func(int (*log_func)(FILE *stream, const char *format, ...));
HG_PUBLIC void
HG_Set_log_stream(const char *level, FILE *stream);

/**
* Dump diagnostic counters into the existing log stream.
*/
HG_PUBLIC void
HG_Diag_dump_counters(void);

/**
* Obtain the name of the given class.
*
Expand Down
57 changes: 55 additions & 2 deletions src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ struct hg_core_counters {
hg_atomic_int64_t *rpc_resp_recv_count; /* RPC responses received */
hg_atomic_int64_t *rpc_req_extra_count; /* RPC that require extra data */
hg_atomic_int64_t *rpc_resp_extra_count; /* RPC that require extra data */
hg_atomic_int64_t *bulk_count; /* Bulk count */
hg_atomic_int64_t *rpc_req_recv_active_count; /* Currently active RPCs */
hg_atomic_int64_t *rpc_multi_recv_copy_count; /* RPCs requests received that
required a copy */
hg_atomic_int64_t *bulk_count; /* Bulk count */
};

/* HG class */
Expand Down Expand Up @@ -362,6 +365,9 @@ struct hg_core_private_handle {
uint8_t cookie; /* Cookie */
bool multi_recv_copy; /* Copy on multi-recv */
bool reuse; /* Re-use handle once ref_count is 0 */
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
bool active;
#endif
};

/* HG op id */
Expand Down Expand Up @@ -1077,6 +1083,10 @@ hg_core_counters_init(struct hg_core_counters *hg_core_counters)
* order */
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->bulk_count, "bulk_count",
"Bulk transfers (inc. extra bulks)");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_multi_recv_copy_count,
"rpc_multi_recv_copy_count", "RPC requests received requiring a copy");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_recv_active_count,
"rpc_req_recv_active_count", "RPC requests received still active");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_resp_extra_count,
"rpc_resp_extra_count", "RPCs with extra bulk response");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_extra_count,
Expand Down Expand Up @@ -1218,7 +1228,7 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version,
if (hg_init_info_p) {
HG_CHECK_SUBSYS_ERROR(cls, version == 0, error, ret, HG_INVALID_ARG,
"API version cannot be 0");
HG_LOG_SUBSYS_DEBUG(cls, "Init info version used: v%d.%d",
HG_LOG_SUBSYS_DEBUG(cls, "HG init info version used: v%d.%d",
HG_MAJOR(version), HG_MINOR(version));
na_init_info_p = &na_init_info;

Expand All @@ -1232,6 +1242,23 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version,
else
hg_init_info_dup_2_2(&hg_init_info,
(const struct hg_init_info_2_2 *) hg_init_info_p);

HG_LOG_SUBSYS_DEBUG(cls,
"HG Init info: na_class=%p, request_post_init=%" PRIu32
", request_post_incr=%" PRId32 ", auto_sm=%" PRIu8
", sm_info_string=%s, checksum_level=%d, no_bulk_eager=%" PRIu8
", no_loopback=%" PRIu8 ", stats=%" PRIu8 ", no_multi_recv=%" PRIu8
", release_input_early=%" PRIu8
", traffic_class=%d, no_overflow=%d, multi_recv_op_max=%u, "
"multi_recv_copy_threshold=%u",
(void *) hg_init_info.na_class, hg_init_info.request_post_init,
hg_init_info.request_post_incr, hg_init_info.auto_sm,
hg_init_info.sm_info_string, hg_init_info.checksum_level,
hg_init_info.no_bulk_eager, hg_init_info.no_loopback,
hg_init_info.stats, hg_init_info.no_multi_recv,
hg_init_info.release_input_early, hg_init_info.traffic_class,
hg_init_info.no_overflow, hg_init_info.multi_recv_op_max,
hg_init_info.multi_recv_copy_threshold);
}

/* Set post init / incr / multi-recv values */
Expand Down Expand Up @@ -3370,6 +3397,14 @@ hg_core_destroy(struct hg_core_private_handle *hg_core_handle)
return HG_SUCCESS; /* Cannot free yet */
}

#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
if (hg_core_handle->active) {
hg_atomic_decr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = false;
}
#endif

/* Re-use handle if we were listening, otherwise destroy it */
if (hg_core_handle->reuse &&
!hg_atomic_get32(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->unposting)) {
Expand Down Expand Up @@ -4447,6 +4482,12 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_thread_spin_lock(&hg_core_handle_pool->pending_list.lock);
LIST_REMOVE(hg_core_handle, pending);
hg_thread_spin_unlock(&hg_core_handle_pool->pending_list.lock);
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = true;
#endif

if (callback_info->ret == NA_SUCCESS) {
/* Extend pool if all handles are being utilized */
Expand Down Expand Up @@ -4549,6 +4590,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
ret = hg_core_handle_pool_get(context->handle_pool, &hg_core_handle);
HG_CHECK_SUBSYS_HG_ERROR(
rpc, error, ret, "Could not get handle from pool");
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = true;
#endif
hg_core_handle->multi_recv_op = multi_recv_op;
hg_atomic_incr32(&multi_recv_op->op_count);
hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV);
Expand Down Expand Up @@ -4600,6 +4647,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_CONTEXT_CLASS(context)
->counters.rpc_multi_recv_copy_count);
#endif

memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->core_handle.in_buf_used);
Expand Down
17 changes: 16 additions & 1 deletion src/na/na.c
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ NA_Initialize_opt2(const char *info_string, bool listen, unsigned int version,
if (na_init_info) {
NA_CHECK_SUBSYS_ERROR(fatal, version == 0, error, ret, NA_INVALID_ARG,
"API version cannot be 0");
NA_LOG_SUBSYS_DEBUG(cls, "Init info version used: v%d.%d",
NA_LOG_SUBSYS_DEBUG(cls, "NA init info version used: v%d.%d",
NA_MAJOR(version), NA_MINOR(version));

/* Get init info and overwrite defaults */
Expand All @@ -808,6 +808,21 @@ NA_Initialize_opt2(const char *info_string, bool listen, unsigned int version,
na_init_info_dup_4_0(&na_info->na_init_info,
(const struct na_init_info_4_0 *) na_init_info);

NA_LOG_SUBSYS_DEBUG(cls,
"NA Init info: ip_subnet=%s, auth_key=%s, max_unexpected_size=%zu, "
"max_expected_size=%zu, progress_mode=%" PRIu8
", addr_format=%d, max_contexts=%" PRIu8 ", thread_mode=%" PRIu8
", request_mem_device=%u, traffic_class=%d",
na_info->na_init_info.ip_subnet, na_info->na_init_info.auth_key,
na_info->na_init_info.max_unexpected_size,
na_info->na_init_info.max_expected_size,
na_info->na_init_info.progress_mode,
na_info->na_init_info.addr_format,
na_info->na_init_info.max_contexts,
na_info->na_init_info.thread_mode,
na_info->na_init_info.request_mem_device,
na_info->na_init_info.traffic_class);

na_private_class->na_class.progress_mode = na_init_info->progress_mode;
}

Expand Down
6 changes: 2 additions & 4 deletions src/util/mercury_dlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,11 @@ hg_dlog_dump_counters(struct hg_dlog *d,

if (!SLIST_EMPTY(&d->cnts32) || !SLIST_EMPTY(&d->cnts64)) {
log_func(stream,
"### ----------------------\n"
"### --------------------------\n"
"### (%s) counter log summary\n"
"### ----------------------\n",
"### --------------------------\n",
(d->dlog_magic + strlen(HG_DLOG_STDMAGIC)));

log_func(stream, "# Counters\n");
SLIST_FOREACH (dc32, &d->cnts32, l) {
log_func(stream, "# %s: %" PRId32 " [%s]\n", dc32->name,
hg_atomic_get32(&dc32->c), dc32->descr);
Expand All @@ -271,7 +270,6 @@ hg_dlog_dump_counters(struct hg_dlog *d,
log_func(stream, "# %s: %" PRId64 " [%s]\n", dc64->name,
hg_atomic_get64(&dc64->c), dc64->descr);
}
log_func(stream, "# -\n");
}

hg_thread_mutex_unlock(&d->dlock);
Expand Down
14 changes: 14 additions & 0 deletions src/util/mercury_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,20 @@ hg_log_outlet_deregister(struct hg_log_outlet *hg_log_outlet)
hg_log_outlet->registered = false;
}

/*---------------------------------------------------------------------------*/
void
hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet)
{
if (hg_log_outlet->debug_log &&
hg_log_outlet->level >= HG_LOG_LEVEL_MIN_DEBUG) {
FILE *stream = hg_log_streams_g[hg_log_outlet->level]
? hg_log_streams_g[hg_log_outlet->level]
: *hg_log_std_streams_g[hg_log_outlet->level];
hg_dlog_dump_counters(
hg_log_outlet->debug_log, hg_log_func_g, stream, 0);
}
}

/*---------------------------------------------------------------------------*/
void
hg_log_write(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level,
Expand Down
8 changes: 8 additions & 0 deletions src/util/mercury_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,14 @@ hg_log_outlet_register(struct hg_log_outlet *outlet);
HG_UTIL_PUBLIC void
hg_log_outlet_deregister(struct hg_log_outlet *outlet);

/**
* Dump counters associated to log outlet.
*
* \param outlet [IN] log outlet
*/
HG_UTIL_PUBLIC void
hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet);

/**
* Write log.
*
Expand Down

0 comments on commit c6a92e5

Please sign in to comment.