From 55abcccb1199e1d855bc0a790dfbe76b4d6a1b14 Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Fri, 23 Aug 2024 11:21:21 -0500 Subject: [PATCH] HG: add HG_Diag_dump_counters() to dump diagnostic counters Add rpc_req_recv_active_count and rpc_multi_recv_copy_count counters --- src/mercury.c | 9 +++++++++ src/mercury.h | 6 ++++++ src/mercury_core.c | 38 +++++++++++++++++++++++++++++++++++++- src/util/mercury_dlog.c | 6 ++---- src/util/mercury_log.c | 17 +++++++++++++++-- src/util/mercury_log.h | 8 ++++++++ 6 files changed, 77 insertions(+), 7 deletions(-) diff --git a/src/mercury.c b/src/mercury.c index d5256d49..a916a49b 100644 --- a/src/mercury.c +++ b/src/mercury.c @@ -1207,6 +1207,15 @@ HG_Set_log_stream(const char *level, FILE *stream) } } +/*---------------------------------------------------------------------------*/ +void +HG_Diag_dump_counters(void) +{ +#ifndef _WIN32 + hg_log_dump_counters(&HG_LOG_OUTLET(hg_diag)); +#endif +} + /*---------------------------------------------------------------------------*/ hg_return_t HG_Class_set_handle_create_callback(hg_class_t *hg_class, diff --git a/src/mercury.h b/src/mercury.h index 09e04006..5ec9f3c8 100644 --- a/src/mercury.h +++ b/src/mercury.h @@ -191,6 +191,12 @@ HG_Set_log_func(int (*log_func)(FILE *stream, const char *format, ...)); HG_PUBLIC void HG_Set_log_stream(const char *level, FILE *stream); +/** + * Dump diagnostic counters into the existing log stream. + */ +HG_PUBLIC void +HG_Diag_dump_counters(void); + /** * Obtain the name of the given class. * diff --git a/src/mercury_core.c b/src/mercury_core.c index 7c4bf36a..9f620399 100644 --- a/src/mercury_core.c +++ b/src/mercury_core.c @@ -167,7 +167,10 @@ struct hg_core_counters { hg_atomic_int64_t *rpc_resp_recv_count; /* RPC responses received */ hg_atomic_int64_t *rpc_req_extra_count; /* RPC that require extra data */ hg_atomic_int64_t *rpc_resp_extra_count; /* RPC that require extra data */ - hg_atomic_int64_t *bulk_count; /* Bulk count */ + hg_atomic_int64_t *rpc_req_recv_active_count; /* Currently active RPCs */ + hg_atomic_int64_t *rpc_multi_recv_copy_count; /* RPCs requests received that + required a copy */ + hg_atomic_int64_t *bulk_count; /* Bulk count */ }; /* HG class */ @@ -362,6 +365,9 @@ struct hg_core_private_handle { uint8_t cookie; /* Cookie */ bool multi_recv_copy; /* Copy on multi-recv */ bool reuse; /* Re-use handle once ref_count is 0 */ +#if defined(HG_HAS_DEBUG) && !defined(_WIN32) + bool active; +#endif }; /* HG op id */ @@ -1077,6 +1083,10 @@ hg_core_counters_init(struct hg_core_counters *hg_core_counters) * order */ HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->bulk_count, "bulk_count", "Bulk transfers (inc. extra bulks)"); + HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_multi_recv_copy_count, + "rpc_multi_recv_copy_count", "RPC requests received requiring a copy"); + HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_recv_active_count, + "rpc_req_recv_active_count", "RPC requests received still active"); HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_resp_extra_count, "rpc_resp_extra_count", "RPCs with extra bulk response"); HG_LOG_ADD_COUNTER64(hg_diag, &hg_core_counters->rpc_req_extra_count, @@ -3370,6 +3380,14 @@ hg_core_destroy(struct hg_core_private_handle *hg_core_handle) return HG_SUCCESS; /* Cannot free yet */ } +#if defined(HG_HAS_DEBUG) && !defined(_WIN32) + if (hg_core_handle->active) { + hg_atomic_decr64(HG_CORE_HANDLE_CLASS(hg_core_handle) + ->counters.rpc_req_recv_active_count); + hg_core_handle->active = false; + } +#endif + /* Re-use handle if we were listening, otherwise destroy it */ if (hg_core_handle->reuse && !hg_atomic_get32(&HG_CORE_HANDLE_CONTEXT(hg_core_handle)->unposting)) { @@ -4447,6 +4465,12 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info) hg_thread_spin_lock(&hg_core_handle_pool->pending_list.lock); LIST_REMOVE(hg_core_handle, pending); hg_thread_spin_unlock(&hg_core_handle_pool->pending_list.lock); +#if defined(HG_HAS_DEBUG) && !defined(_WIN32) + /* Increment counter */ + hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle) + ->counters.rpc_req_recv_active_count); + hg_core_handle->active = true; +#endif if (callback_info->ret == NA_SUCCESS) { /* Extend pool if all handles are being utilized */ @@ -4549,6 +4573,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) ret = hg_core_handle_pool_get(context->handle_pool, &hg_core_handle); HG_CHECK_SUBSYS_HG_ERROR( rpc, error, ret, "Could not get handle from pool"); +#if defined(HG_HAS_DEBUG) && !defined(_WIN32) + /* Increment counter */ + hg_atomic_incr64(HG_CORE_HANDLE_CLASS(hg_core_handle) + ->counters.rpc_req_recv_active_count); + hg_core_handle->active = true; +#endif hg_core_handle->multi_recv_op = multi_recv_op; hg_atomic_incr32(&multi_recv_op->op_count); hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV); @@ -4600,6 +4630,12 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) "Copying multi-recv payload of size %zu for handle (%p)", hg_core_handle->core_handle.in_buf_used, (void *) hg_core_handle); +#if defined(HG_HAS_DEBUG) && !defined(_WIN32) + /* Increment counter */ + hg_atomic_incr64(HG_CORE_CONTEXT_CLASS(context) + ->counters.rpc_multi_recv_copy_count); +#endif + memcpy(hg_core_handle->in_buf_storage, na_cb_info_multi_recv_unexpected->actual_buf, hg_core_handle->core_handle.in_buf_used); diff --git a/src/util/mercury_dlog.c b/src/util/mercury_dlog.c index 6c475696..a5ff1c92 100644 --- a/src/util/mercury_dlog.c +++ b/src/util/mercury_dlog.c @@ -257,12 +257,11 @@ hg_dlog_dump_counters(struct hg_dlog *d, if (!SLIST_EMPTY(&d->cnts32) || !SLIST_EMPTY(&d->cnts64)) { log_func(stream, - "### ----------------------\n" + "### --------------------------\n" "### (%s) counter log summary\n" - "### ----------------------\n", + "### --------------------------\n", (d->dlog_magic + strlen(HG_DLOG_STDMAGIC))); - log_func(stream, "# Counters\n"); SLIST_FOREACH (dc32, &d->cnts32, l) { log_func(stream, "# %s: %" PRId32 " [%s]\n", dc32->name, hg_atomic_get32(&dc32->c), dc32->descr); @@ -271,7 +270,6 @@ hg_dlog_dump_counters(struct hg_dlog *d, log_func(stream, "# %s: %" PRId64 " [%s]\n", dc64->name, hg_atomic_get64(&dc64->c), dc64->descr); } - log_func(stream, "# -\n"); } hg_thread_mutex_unlock(&d->dlock); diff --git a/src/util/mercury_log.c b/src/util/mercury_log.c index c4d16510..7f33c650 100644 --- a/src/util/mercury_log.c +++ b/src/util/mercury_log.c @@ -508,6 +508,20 @@ hg_log_outlet_deregister(struct hg_log_outlet *hg_log_outlet) hg_log_outlet->registered = false; } +/*---------------------------------------------------------------------------*/ +void +hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet) +{ + if (hg_log_outlet->debug_log && + hg_log_outlet->level >= HG_LOG_LEVEL_MIN_DEBUG) { + FILE *stream = hg_log_streams_g[hg_log_outlet->level] + ? hg_log_streams_g[hg_log_outlet->level] + : *hg_log_std_streams_g[hg_log_outlet->level]; + hg_dlog_dump_counters( + hg_log_outlet->debug_log, hg_log_func_g, stream, 0); + } +} + /*---------------------------------------------------------------------------*/ void hg_log_write(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level, @@ -568,8 +582,7 @@ hg_log_vwrite(struct hg_log_outlet *hg_log_outlet, enum hg_log_level log_level, #else /* Print using logging function */ hg_log_func_g(stream, - "# [%lf] %s->%s: [%s] %s%s%s:%d\n" - " # %s(): %s%s", + "# [%lf] %s->%s [%s] %s%s%s:%d %s() %s%s", hg_time_to_double(tv), "mercury", hg_log_outlet->name, level_name, module ? module : "", module ? ":" : "", file, line, func, buf, no_return ? "" : "\n"); diff --git a/src/util/mercury_log.h b/src/util/mercury_log.h index 416fa6d0..dc893a45 100644 --- a/src/util/mercury_log.h +++ b/src/util/mercury_log.h @@ -483,6 +483,14 @@ hg_log_outlet_register(struct hg_log_outlet *outlet); HG_UTIL_PUBLIC void hg_log_outlet_deregister(struct hg_log_outlet *outlet); +/** + * Dump counters associated to log outlet. + * + * \param outlet [IN] log outlet + */ +HG_UTIL_PUBLIC void +hg_log_dump_counters(struct hg_log_outlet *hg_log_outlet); + /** * Write log. *