From 23a7e9a6b7efdfa491b7efedcea8a6cc6971a480 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Sat, 7 Oct 2023 18:30:01 -0400 Subject: [PATCH] Add heartbeat capability Presence is controlled by a build-time option. Start a separate thread which simply sleeps. When heartbeats are enabled, this thread wakes up at specified intervals to verify that user code is heartbeating as requested and if not, prints task backtraces. Also fixes the call to `maxthreadid` in `generate_precompile.jl`. --- contrib/generate_precompile.jl | 2 +- src/init.c | 4 + src/options.h | 3 + src/threading.c | 221 +++++++++++++++++++++++++++++++++ 4 files changed, 229 insertions(+), 1 deletion(-) diff --git a/contrib/generate_precompile.jl b/contrib/generate_precompile.jl index 92dde676244e4f..9f36e28353c971 100644 --- a/contrib/generate_precompile.jl +++ b/contrib/generate_precompile.jl @@ -4,7 +4,7 @@ @eval Core.Module() begin if Threads.maxthreadid() != 1 - @warn "Running this file with multiple Julia threads may lead to a build error" Threads.maxthreadid() + @warn "Running this file with multiple Julia threads may lead to a build error" Base.Threads.maxthreadid() end if Base.isempty(Base.ARGS) || Base.ARGS[1] !== "0" diff --git a/src/init.c b/src/init.c index 2c1ad618948f85..2b4ebd18fe9ead 100644 --- a/src/init.c +++ b/src/init.c @@ -843,6 +843,8 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) _finish_julia_init(rel, ptls, ct); } +void jl_init_heartbeat(void); + static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_task_t *ct) { JL_TIMING(JULIA_INIT, JULIA_INIT); @@ -890,6 +892,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ } jl_start_threads(); + jl_init_heartbeat(); + jl_gc_enable(1); if (jl_options.image_file && (!jl_generating_output() || jl_options.incremental) && jl_module_init_order) { diff --git a/src/options.h b/src/options.h index 69b31ea9180080..fc382cd0a0dd6d 100644 --- a/src/options.h +++ b/src/options.h @@ -144,6 +144,9 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// heartbeats +#define JL_HEARTBEAT_THREAD + // sanitizer defaults --------------------------------------------------------- // Automatically enable MEMDEBUG and KEEP_BODIES for the sanitizers diff --git a/src/threading.c b/src/threading.c index eb76ac579b5389..b78cfc826d836a 100644 --- a/src/threading.c +++ b/src/threading.c @@ -984,6 +984,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz) return jl_gc_alignment(sz); } +// Heartbeat mechanism for Julia's task scheduler +// --- +// Start a thread that does not participate in running Julia's tasks. This +// thread simply sleeps until the heartbeat mechanism is enabled. When +// enabled, the heartbeat thread enters a loop in which it blocks waiting +// for the specified heartbeat interval. If, within that interval, +// `jl_heartbeat()` is *not* called at least once, then the thread calls +// `jl_print_task_backtraces(0)`. + +#ifdef JL_HEARTBEAT_THREAD + +#include + +volatile int heartbeat_enabled; +uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread + heartbeat_off_sem; // thread -> jl_heartbeat_enable +int heartbeat_interval_s, + n_loss_reports, + reset_reporting_s; +int last_report_s, report_interval_s, n_reported; +_Atomic(int) heartbeats; + +JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT; +void jl_heartbeat_threadfun(void *arg); + +// start the heartbeat thread with heartbeats disabled +void jl_init_heartbeat(void) +{ + uv_thread_t uvtid; + heartbeat_enabled = 0; + uv_sem_init(&heartbeat_on_sem, 0); + uv_sem_init(&heartbeat_off_sem, 0); + uv_thread_create(&uvtid, jl_heartbeat_threadfun, NULL); + uv_thread_detach(&uvtid); +} + +// enable/disable heartbeats +// heartbeat_s: interval within which jl_heartbeat() must be called +// n_reports: for one heartbeat loss interval, how many times to report +// reset_reporting_after_s: how long to wait after a heartbeat loss +// interval and a return to steady heartbeats, before resetting +// reporting behavior +// +// When disabling heartbeats, the heartbeat thread must wake up, +// find out that heartbeats are now diabled, and reset. For now, we +// handle this by preventing re-enabling of heartbeats until this +// completes. +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + if (heartbeat_s <= 0) { + heartbeat_enabled = 0; + heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0; + } + else { + // must disable before enabling + if (heartbeat_enabled) { + return -1; + } + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + jl_atomic_store_relaxed(&heartbeats, 0); + heartbeat_interval_s = heartbeat_s; + n_loss_reports = n_reports; + reset_reporting_s = reset_reporting_after_s; + last_report_s = 0; + report_interval_s = heartbeat_interval_s; + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + } + return 0; +} + +// heartbeat +JL_DLLEXPORT void jl_heartbeat(void) +{ + jl_atomic_fetch_add(&heartbeats, 1); +} + +// sleep the thread for the specified interval +void sleep_for(int secs, int nsecs) +{ + struct timespec rqtp, rmtp; + rqtp.tv_sec = secs; + rqtp.tv_nsec = nsecs; + rmtp.tv_sec = 0; + rmtp.tv_nsec = 0; + for (; ;) { + // this suspends the thread so we aren't using CPU + if (nanosleep(&rqtp, &rmtp) == 0) { + return; + } + // TODO: else if (errno == EINTR) + // this could be SIGTERM and we should shutdown but how to find out? + rqtp = rmtp; + } +} + +// check for heartbeats and maybe report loss +uint8_t check_heartbeats(uint8_t gc_state) +{ + int hb = jl_atomic_exchange(&heartbeats, 0); + uint64_t curr_s = jl_hrtime() / 1e9; + + if (hb <= 0) { + // we didn't get a heartbeat in the last interval; should we report? + if (n_reported < n_loss_reports && + curr_s - last_report_s >= report_interval_s) { + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // exit GC-safe region to report then re-enter + jl_gc_safe_leave(ptls, gc_state); + jl_safe_printf("==== heartbeat loss ====\n"); + jl_print_task_backtraces(0); + gc_state = jl_gc_safe_enter(ptls); + + // we've reported + n_reported++; + + // record the reporting time _after_ the report + last_report_s = jl_hrtime() / 1e9; + + // double the reporting interval up to a maximum + if (report_interval_s < 60 * heartbeat_interval_s) { + report_interval_s *= 2; + } + } + // no heartbeats, don't change reporting state + return gc_state; + } + else { + // we got a heartbeat; reset the report count + n_reported = 0; + } + + // reset the reporting interval only once we're steadily getting + // heartbeats for the requested reset interval + if (curr_s - reset_reporting_s > last_report_s) { + report_interval_s = heartbeat_interval_s; + } + + return gc_state; +} + +// heartbeat thread function +void jl_heartbeat_threadfun(void *arg) +{ + int s, ns = 1e9 - 1, rs; + uint64_t t0, tchb; + + // We need a TLS because backtraces are accumulated into ptls->bt_size + // and ptls->bt_data, so we need to call jl_adopt_thread(). + jl_adopt_thread(); + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // Don't hold up GC, this thread doesn't participate. + uint8_t gc_state = jl_gc_safe_enter(ptls); + + for (;;) { + if (!heartbeat_enabled) { + // post the off semaphore to indicate we're ready to enable + uv_sem_post(&heartbeat_off_sem); + + // sleep the thread here; this semaphore is posted in + // jl_heartbeat_enable() + uv_sem_wait(&heartbeat_on_sem); + + // Set the sleep duration. + s = heartbeat_interval_s - 1; + ns = 1e9 - 1; + continue; + } + + // heartbeat is enabled; sleep, waiting for the desired interval + sleep_for(s, ns); + + // if heartbeats were turned off while we were sleeping, reset + if (!heartbeat_enabled) { + continue; + } + + // check if any heartbeats have happened, report as appropriate + t0 = jl_hrtime(); + gc_state = check_heartbeats(gc_state); + tchb = jl_hrtime() - t0; + + // adjust the next sleep duration based on how long the heartbeat + // check took + rs = 1; + while (tchb > 1e9) { + rs++; + tchb -= 1e9; + } + s = heartbeat_interval_s - rs; + ns = 1e9 - tchb; + } +} + +#else // !JL_HEARTBEAT_THREAD + +void jl_init_heartbeat(void) +{ +} + +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + return -1; +} + +JL_DLLEXPORT void jl_heartbeat(void) +{ +} + +#endif // JL_HEARTBEAT_THREAD + #ifdef __cplusplus } #endif