diff --git a/contrib/generate_precompile.jl b/contrib/generate_precompile.jl index 92dde676244e4f..9f36e28353c971 100644 --- a/contrib/generate_precompile.jl +++ b/contrib/generate_precompile.jl @@ -4,7 +4,7 @@ @eval Core.Module() begin if Threads.maxthreadid() != 1 - @warn "Running this file with multiple Julia threads may lead to a build error" Threads.maxthreadid() + @warn "Running this file with multiple Julia threads may lead to a build error" Base.Threads.maxthreadid() end if Base.isempty(Base.ARGS) || Base.ARGS[1] !== "0" diff --git a/src/init.c b/src/init.c index 2c1ad618948f85..2b4ebd18fe9ead 100644 --- a/src/init.c +++ b/src/init.c @@ -843,6 +843,8 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) _finish_julia_init(rel, ptls, ct); } +void jl_init_heartbeat(void); + static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_task_t *ct) { JL_TIMING(JULIA_INIT, JULIA_INIT); @@ -890,6 +892,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ } jl_start_threads(); + jl_init_heartbeat(); + jl_gc_enable(1); if (jl_options.image_file && (!jl_generating_output() || jl_options.incremental) && jl_module_init_order) { diff --git a/src/options.h b/src/options.h index 69b31ea9180080..fc382cd0a0dd6d 100644 --- a/src/options.h +++ b/src/options.h @@ -144,6 +144,9 @@ #define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE" #define DEFAULT_MACHINE_EXCLUSIVE 0 +// heartbeats +#define JL_HEARTBEAT_THREAD + // sanitizer defaults --------------------------------------------------------- // Automatically enable MEMDEBUG and KEEP_BODIES for the sanitizers diff --git a/src/threading.c b/src/threading.c index eb76ac579b5389..b78cfc826d836a 100644 --- a/src/threading.c +++ b/src/threading.c @@ -984,6 +984,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz) return jl_gc_alignment(sz); } +// Heartbeat mechanism for Julia's task scheduler +// --- +// Start a thread that does not participate in running Julia's tasks. This +// thread simply sleeps until the heartbeat mechanism is enabled. When +// enabled, the heartbeat thread enters a loop in which it blocks waiting +// for the specified heartbeat interval. If, within that interval, +// `jl_heartbeat()` is *not* called at least once, then the thread calls +// `jl_print_task_backtraces(0)`. + +#ifdef JL_HEARTBEAT_THREAD + +#include + +volatile int heartbeat_enabled; +uv_sem_t heartbeat_on_sem, // jl_heartbeat_enable -> thread + heartbeat_off_sem; // thread -> jl_heartbeat_enable +int heartbeat_interval_s, + n_loss_reports, + reset_reporting_s; +int last_report_s, report_interval_s, n_reported; +_Atomic(int) heartbeats; + +JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT; +void jl_heartbeat_threadfun(void *arg); + +// start the heartbeat thread with heartbeats disabled +void jl_init_heartbeat(void) +{ + uv_thread_t uvtid; + heartbeat_enabled = 0; + uv_sem_init(&heartbeat_on_sem, 0); + uv_sem_init(&heartbeat_off_sem, 0); + uv_thread_create(&uvtid, jl_heartbeat_threadfun, NULL); + uv_thread_detach(&uvtid); +} + +// enable/disable heartbeats +// heartbeat_s: interval within which jl_heartbeat() must be called +// n_reports: for one heartbeat loss interval, how many times to report +// reset_reporting_after_s: how long to wait after a heartbeat loss +// interval and a return to steady heartbeats, before resetting +// reporting behavior +// +// When disabling heartbeats, the heartbeat thread must wake up, +// find out that heartbeats are now diabled, and reset. For now, we +// handle this by preventing re-enabling of heartbeats until this +// completes. +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + if (heartbeat_s <= 0) { + heartbeat_enabled = 0; + heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0; + } + else { + // must disable before enabling + if (heartbeat_enabled) { + return -1; + } + // heartbeat thread must be ready + if (uv_sem_trywait(&heartbeat_off_sem) != 0) { + return -1; + } + + jl_atomic_store_relaxed(&heartbeats, 0); + heartbeat_interval_s = heartbeat_s; + n_loss_reports = n_reports; + reset_reporting_s = reset_reporting_after_s; + last_report_s = 0; + report_interval_s = heartbeat_interval_s; + heartbeat_enabled = 1; + uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread + } + return 0; +} + +// heartbeat +JL_DLLEXPORT void jl_heartbeat(void) +{ + jl_atomic_fetch_add(&heartbeats, 1); +} + +// sleep the thread for the specified interval +void sleep_for(int secs, int nsecs) +{ + struct timespec rqtp, rmtp; + rqtp.tv_sec = secs; + rqtp.tv_nsec = nsecs; + rmtp.tv_sec = 0; + rmtp.tv_nsec = 0; + for (; ;) { + // this suspends the thread so we aren't using CPU + if (nanosleep(&rqtp, &rmtp) == 0) { + return; + } + // TODO: else if (errno == EINTR) + // this could be SIGTERM and we should shutdown but how to find out? + rqtp = rmtp; + } +} + +// check for heartbeats and maybe report loss +uint8_t check_heartbeats(uint8_t gc_state) +{ + int hb = jl_atomic_exchange(&heartbeats, 0); + uint64_t curr_s = jl_hrtime() / 1e9; + + if (hb <= 0) { + // we didn't get a heartbeat in the last interval; should we report? + if (n_reported < n_loss_reports && + curr_s - last_report_s >= report_interval_s) { + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // exit GC-safe region to report then re-enter + jl_gc_safe_leave(ptls, gc_state); + jl_safe_printf("==== heartbeat loss ====\n"); + jl_print_task_backtraces(0); + gc_state = jl_gc_safe_enter(ptls); + + // we've reported + n_reported++; + + // record the reporting time _after_ the report + last_report_s = jl_hrtime() / 1e9; + + // double the reporting interval up to a maximum + if (report_interval_s < 60 * heartbeat_interval_s) { + report_interval_s *= 2; + } + } + // no heartbeats, don't change reporting state + return gc_state; + } + else { + // we got a heartbeat; reset the report count + n_reported = 0; + } + + // reset the reporting interval only once we're steadily getting + // heartbeats for the requested reset interval + if (curr_s - reset_reporting_s > last_report_s) { + report_interval_s = heartbeat_interval_s; + } + + return gc_state; +} + +// heartbeat thread function +void jl_heartbeat_threadfun(void *arg) +{ + int s, ns = 1e9 - 1, rs; + uint64_t t0, tchb; + + // We need a TLS because backtraces are accumulated into ptls->bt_size + // and ptls->bt_data, so we need to call jl_adopt_thread(). + jl_adopt_thread(); + jl_task_t *ct = jl_current_task; + jl_ptls_t ptls = ct->ptls; + + // Don't hold up GC, this thread doesn't participate. + uint8_t gc_state = jl_gc_safe_enter(ptls); + + for (;;) { + if (!heartbeat_enabled) { + // post the off semaphore to indicate we're ready to enable + uv_sem_post(&heartbeat_off_sem); + + // sleep the thread here; this semaphore is posted in + // jl_heartbeat_enable() + uv_sem_wait(&heartbeat_on_sem); + + // Set the sleep duration. + s = heartbeat_interval_s - 1; + ns = 1e9 - 1; + continue; + } + + // heartbeat is enabled; sleep, waiting for the desired interval + sleep_for(s, ns); + + // if heartbeats were turned off while we were sleeping, reset + if (!heartbeat_enabled) { + continue; + } + + // check if any heartbeats have happened, report as appropriate + t0 = jl_hrtime(); + gc_state = check_heartbeats(gc_state); + tchb = jl_hrtime() - t0; + + // adjust the next sleep duration based on how long the heartbeat + // check took + rs = 1; + while (tchb > 1e9) { + rs++; + tchb -= 1e9; + } + s = heartbeat_interval_s - rs; + ns = 1e9 - tchb; + } +} + +#else // !JL_HEARTBEAT_THREAD + +void jl_init_heartbeat(void) +{ +} + +JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int n_reports, + int reset_reporting_after_s) +{ + return -1; +} + +JL_DLLEXPORT void jl_heartbeat(void) +{ +} + +#endif // JL_HEARTBEAT_THREAD + #ifdef __cplusplus } #endif