Skip to content

Commit

Permalink
HG: add HG_Diag_dump_counters() to dump diagnostic counters
Browse files Browse the repository at this point in the history
Add rpc_req_recv_active_count and rpc_multi_recv_copy_count counters
  • Loading branch information
soumagne committed Aug 23, 2024
1 parent 2635745 commit 55abccc
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 7 deletions.
9 changes: 9 additions & 0 deletions src/mercury.c
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,15 @@ HG_Set_log_stream(const char *level, FILE *stream)
}
}

/*---------------------------------------------------------------------------*/
void
HG_Diag_dump_counters(void)
{
#ifndef _WIN32
hg_log_dump_counters(&HG_LOG_OUTLET(hg_diag));
#endif
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Class_set_handle_create_callback(hg_class_t *hg_class,
Expand Down
6 changes: 6 additions & 0 deletions src/mercury.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ HG_Set_log_func(int (*log_func)(FILE *stream, const char *format, ...));
HG_PUBLIC void
HG_Set_log_stream(const char *level, FILE *stream);

/**
* Dump diagnostic counters into the existing log stream.
*/
HG_PUBLIC void
HG_Diag_dump_counters(void);

/**
* Obtain the name of the given class.
*
Expand Down
38 changes: 37 additions & 1 deletion src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ struct hg_core_counters {
hg_atomic_int64_t *rpc_resp_recv_count; /* RPC responses received */
hg_atomic_int64_t *rpc_req_extra_count; /* RPC that require extra data */
hg_atomic_int64_t *rpc_resp_extra_count; /* RPC that require extra data */
hg_atomic_int64_t *bulk_count; /* Bulk count */
hg_atomic_int64_t *rpc_req_recv_active_count; /* Currently active RPCs */
hg_atomic_int64_t *rpc_multi_recv_copy_count; /* RPCs requests received that
required a copy */
hg_atomic_int64_t *bulk_count; /* Bulk count */
};

/* HG class */
Expand Down Expand Up @@ -362,6 +365,9 @@ struct hg_core_private_handle {
uint8_t cookie; /* Cookie */
bool multi_recv_copy; /* Copy on multi-recv */
bool reuse; /* Re-use handle once ref_count is 0 */
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
bool active;
#endif
};

/* HG op id */
Expand Down Expand Up @@ -1077,6 +1083,10 @@ hg_core_counters_init(struct hg_core_counters *hg_core_counters)
* order */
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->bulk_count, "bulk_count",
"Bulk transfers (inc. extra bulks)");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_multi_recv_copy_count,
"rpc_multi_recv_copy_count", "RPC requests received requiring a copy");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_recv_active_count,
"rpc_req_recv_active_count", "RPC requests received still active");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_resp_extra_count,
"rpc_resp_extra_count", "RPCs with extra bulk response");
HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_extra_count,
Expand Down Expand Up @@ -3370,6 +3380,14 @@ hg_core_destroy(struct hg_core_private_handle *hg_core_handle)
return HG_SUCCESS; /* Cannot free yet */
}

#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
if (hg_core_handle->active) {
hg_atomic_decr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = false;
}
#endif

/* Re-use handle if we were listening, otherwise destroy it */
if (hg_core_handle->reuse &&
!hg_atomic_get32(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->unposting)) {
Expand Down Expand Up @@ -4447,6 +4465,12 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_thread_spin_lock(&hg_core_handle_pool->pending_list.lock);
LIST_REMOVE(hg_core_handle, pending);
hg_thread_spin_unlock(&hg_core_handle_pool->pending_list.lock);
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = true;
#endif

if (callback_info->ret == NA_SUCCESS) {
/* Extend pool if all handles are being utilized */
Expand Down Expand Up @@ -4549,6 +4573,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
ret = hg_core_handle_pool_get(context->handle_pool, &hg_core_handle);
HG_CHECK_SUBSYS_HG_ERROR(
rpc, error, ret, "Could not get handle from pool");
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle)
->counters.rpc_req_recv_active_count);
hg_core_handle->active = true;
#endif
hg_core_handle->multi_recv_op = multi_recv_op;
hg_atomic_incr32(&multi_recv_op->op_count);
hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV);
Expand Down Expand Up @@ -4600,6 +4630,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
#if defined(HG_HAS_DEBUG) && !defined(_WIN32)
/* Increment counter */
hg_atomic_incr64(HG_CORE_CONTEXT_CLASS(context)
->counters.rpc_multi_recv_copy_count);
#endif

memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->core_handle.in_buf_used);
Expand Down
6 changes: 2 additions & 4 deletions src/util/mercury_dlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,11 @@ hg_dlog_dump_counters(struct hg_dlog *d,

if (!SLIST_EMPTY(&d->cnts32) || !SLIST_EMPTY(&d->cnts64)) {
log_func(stream,
"### ----------------------\n"
"### --------------------------\n"
"### (%s) counter log summary\n"
"### ----------------------\n",
"### --------------------------\n",
(d->dlog_magic + strlen(HG_DLOG_STDMAGIC)));

log_func(stream, "# Counters\n");
SLIST_FOREACH (dc32, &d->cnts32, l) {
log_func(stream, "# %s: %" PRId32 " [%s]\n", dc32->name,
hg_atomic_get32(&dc32->c), dc32->descr);
Expand All @@ -271,7 +270,6 @@ hg_dlog_dump_counters(struct hg_dlog *d,
log_func(stream, "# %s: %" PRId64 " [%s]\n", dc64->name,
hg_atomic_get64(&dc64->c), dc64->descr);
}
log_func(stream, "# -\n");
}

hg_thread_mutex_unlock(&d->dlock);
Expand Down
17 changes: 15 additions & 2 deletions src/util/mercury_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,20 @@ hg_log_outlet_deregister(struct hg_log_outlet *hg_log_outlet)
hg_log_outlet->registered = false;
}

/*---------------------------------------------------------------------------*/
void
hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet)
{
if (hg_log_outlet->debug_log &&
hg_log_outlet->level >= HG_LOG_LEVEL_MIN_DEBUG) {
FILE *stream = hg_log_streams_g[hg_log_outlet->level]
? hg_log_streams_g[hg_log_outlet->level]
: *hg_log_std_streams_g[hg_log_outlet->level];
hg_dlog_dump_counters(
hg_log_outlet->debug_log, hg_log_func_g, stream, 0);
}
}

/*---------------------------------------------------------------------------*/
void
hg_log_write(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level,
Expand Down Expand Up @@ -568,8 +582,7 @@ hg_log_vwrite(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level,
#else
/* Print using logging function */
hg_log_func_g(stream,
"# [%lf] %s->%s: [%s] %s%s%s:%d\n"
" # %s(): %s%s",
"# [%lf] %s->%s [%s] %s%s%s:%d %s() %s%s",
hg_time_to_double(tv), "mercury", hg_log_outlet->name, level_name,
module ? module : "", module ? ":" : "", file, line, func, buf,
no_return ? "" : "\n");
Expand Down
8 changes: 8 additions & 0 deletions src/util/mercury_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,14 @@ hg_log_outlet_register(struct hg_log_outlet *outlet);
HG_UTIL_PUBLIC void
hg_log_outlet_deregister(struct hg_log_outlet *outlet);

/**
* Dump counters associated to log outlet.
*
* \param outlet [IN] log outlet
*/
HG_UTIL_PUBLIC void
hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet);

/**
* Write log.
*
Expand Down

0 comments on commit 55abccc

Please sign in to comment.