Skip to content

Commit

Permalink
Adjust heartbeat behavior (#180)
Browse files Browse the repository at this point in the history
* Add heartbeat pause/resume capability

* Add check to avoid negative sleep duration

* Disable heartbeats in `jl_print_task_backtraces()`

`jl_print_task_backtraces()` can take long enough that there can
be heartbeat loss, which can trigger printing task backtraces
again, unless it is called from the heartbeat thread which takes
care of that possible problem.

* Pause heartbeats for GC

* Address review comment

* Address review comment
  • Loading branch information
kpamnany authored Sep 24, 2024
1 parent a911d00 commit 037dc51
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3734,6 +3734,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
return recollect;
}

extern int jl_heartbeat_pause(void);
extern int jl_heartbeat_resume(void);

JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
{
JL_PROBE_GC_BEGIN(collection);
Expand Down Expand Up @@ -3775,6 +3778,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
// existence of the thread in the jl_n_threads count.
//
// TODO: concurrently queue objects
jl_heartbeat_pause();
jl_fence();
gc_n_threads = jl_atomic_load_acquire(&jl_n_threads);
gc_all_tls_states = jl_atomic_load_relaxed(&jl_all_tls_states);
Expand Down Expand Up @@ -3806,6 +3810,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)

gc_n_threads = 0;
gc_all_tls_states = NULL;
jl_heartbeat_resume();
jl_safepoint_end_gc();
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
JL_PROBE_GC_END();
Expand Down
18 changes: 17 additions & 1 deletion src/stackwalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,10 +1166,22 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT
}

extern int gc_first_tid;
extern int jl_inside_heartbeat_thread(void);
extern int jl_heartbeat_pause(void);
extern int jl_heartbeat_resume(void);

// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr
// Print backtraces for all live tasks, for all threads, to jl_safe_printf
// stderr. This can take a _long_ time!
JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
{
// disable heartbeats to prevent heartbeat loss while running this,
// unless this is called from the heartbeat thread itself; in that
// situation, the thread is busy running this and it will not be
// updating the missed heartbeats counter
if (!jl_inside_heartbeat_thread()) {
jl_heartbeat_pause();
}

size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
int ctid = -1;
Expand Down Expand Up @@ -1232,6 +1244,10 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
jl_safe_printf("thread (%d) ==== End thread %d\n", ctid, ptls2->tid + 1);
}
jl_safe_printf("thread (%d) ++++ Done\n", ctid);

if (!jl_inside_heartbeat_thread()) {
jl_heartbeat_resume();
}
}

#ifdef __cplusplus
Expand Down
59 changes: 55 additions & 4 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,45 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
return 0;
}

// temporarily pause the heartbeat thread
JL_DLLEXPORT int jl_heartbeat_pause(void)
{
if (!heartbeat_enabled) {
return -1;
}
heartbeat_enabled = 0;
return 0;
}

// resume the paused heartbeat thread
JL_DLLEXPORT int jl_heartbeat_resume(void)
{
// cannot resume if the heartbeat thread is already running
if (heartbeat_enabled) {
return -1;
}

// cannot resume if we weren't paused (disabled != paused)
if (heartbeat_interval_s == 0) {
return -1;
}

// heartbeat thread must be ready
if (uv_sem_trywait(&heartbeat_off_sem) != 0) {
return -1;
}

// reset state as we've been paused
n_hbs_missed = 0;
n_hbs_recvd = 0;
tasks_showed = 0;

// resume
heartbeat_enabled = 1;
uv_sem_post(&heartbeat_on_sem); // wake the heartbeat thread
return 0;
}

// heartbeat
JL_DLLEXPORT void jl_heartbeat(void)
{
Expand Down Expand Up @@ -1099,7 +1138,7 @@ void jl_heartbeat_threadfun(void *arg)
uv_sem_post(&heartbeat_off_sem);

// sleep the thread here; this semaphore is posted in
// jl_heartbeat_enable()
// jl_heartbeat_enable() or jl_heartbeat_resume()
uv_sem_wait(&heartbeat_on_sem);

// Set the sleep duration.
Expand All @@ -1111,7 +1150,7 @@ void jl_heartbeat_threadfun(void *arg)
// heartbeat is enabled; sleep, waiting for the desired interval
sleep_for(s, ns);

// if heartbeats were turned off while we were sleeping, reset
// if heartbeats were turned off/paused while we were sleeping, reset
if (!heartbeat_enabled) {
continue;
}
Expand All @@ -1122,13 +1161,15 @@ void jl_heartbeat_threadfun(void *arg)
tchb = jl_hrtime() - t0;

// adjust the next sleep duration based on how long the heartbeat
// check took
// check took, but if it took too long then use the normal duration
rs = 1;
while (tchb > 1e9) {
rs++;
tchb -= 1e9;
}
s = heartbeat_interval_s - rs;
if (rs < heartbeat_interval_s) {
s = heartbeat_interval_s - rs;
}
ns = 1e9 - tchb;
}
}
Expand All @@ -1150,6 +1191,16 @@ JL_DLLEXPORT int jl_heartbeat_enable(int heartbeat_s, int show_tasks_after_n,
return -1;
}

JL_DLLEXPORT int jl_heartbeat_pause(void)
{
return -1;
}

JL_DLLEXPORT int jl_heartbeat_resume(void)
{
return -1;
}

JL_DLLEXPORT void jl_heartbeat(void)
{
}
Expand Down

0 comments on commit 037dc51

Please sign in to comment.