Skip to content

Commit

Permalink
handle async termination better (#55440)
Browse files Browse the repository at this point in the history
Fixes #55235

Disables the assertion failure in the scheduler, so that we are more
likely to be able to report the underlying failure and run atexit
handlers successfully. This should clean up some of the error messages
that occur on timeout.
```
julia> sleep(5)
^\
[89829] signal 3: Quit: 3
in expression starting at REPL[1]:1
kevent at /usr/lib/system/libsystem_kernel.dylib (unknown line)
unknown function (ip: 0x0)
Allocations: 830502 (Pool: 830353; Big: 149); GC: 1
Quit: 3
```
  • Loading branch information
vtjnash authored Aug 15, 2024
1 parent b4ebb00 commit 67c1723
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 27 deletions.
2 changes: 1 addition & 1 deletion base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ function start_profile_listener()
# this will prompt any ongoing or pending event to flush also
close(cond)
# error-propagation is not needed, since the errormonitor will handle printing that better
_wait(t)
t === current_task() || _wait(t)
end
finalizer(cond) do c
# if something goes south, still make sure we aren't keeping a reference in C to this
Expand Down
2 changes: 1 addition & 1 deletion base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ function wait(c::GenericCondition; first::Bool=false)
try
return wait()
catch
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
finally
relockall(c.lock, token)
Expand Down
5 changes: 5 additions & 0 deletions base/initdefs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ function atexit(f::Function)
end

function _atexit(exitcode::Cint)
# this current task shouldn't be scheduled anywhere, but if it was (because
# this exit came from a signal for example), then try to clear that state
# to minimize scheduler issues later
ct = current_task()
q = ct.queue; q === nothing || list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
# Don't hold the lock around the iteration, just in case any other thread executing in
# parallel tries to register a new atexit hook while this is running. We don't want to
# block that thread from proceeding, and we can allow it to register its hook which we
Expand Down
4 changes: 2 additions & 2 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ function closewrite(s::LibuvStream)
# try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
sigatomic_end()
iolock_begin()
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we won't get spurious notifications later
Expand Down Expand Up @@ -1076,7 +1076,7 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
# try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
sigatomic_end()
iolock_begin()
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(uvw) != C_NULL
# uvw is still alive,
# so make sure we won't get spurious notifications later
Expand Down
34 changes: 19 additions & 15 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ end

# just wait for a task to be done, no error propagation
function _wait(t::Task)
t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
if !istaskdone(t)
donenotify = t.donenotify::ThreadSynchronizer
lock(donenotify)
Expand Down Expand Up @@ -374,7 +375,6 @@ in an error, thrown as a [`TaskFailedException`](@ref) which wraps the failed ta
Throws a `ConcurrencyViolationError` if `t` is the currently running task, to prevent deadlocks.
"""
function wait(t::Task; throw=true)
t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
_wait(t)
if throw && istaskfailed(t)
Core.throw(TaskFailedException(t))
Expand Down Expand Up @@ -813,12 +813,15 @@ macro sync_add(expr)
end
end

throwto_repl_task(@nospecialize val) = throwto(getfield(active_repl_backend, :backend_task)::Task, val)

function is_repl_running()
return isdefined(Base, :active_repl_backend) &&
(getfield(active_repl_backend, :backend_task)::Task)._state === task_state_runnable &&
getfield(active_repl_backend, :in_eval)
function repl_backend_task()
@isdefined(active_repl_backend) || return
backend = active_repl_backend
isdefined(backend, :backend_task) || return
backend_task = getfield(active_repl_backend, :backend_task)::Task
if backend_task._state === task_state_runnable && getfield(backend, :in_eval)
return backend_task
end
return
end

# runtime system hook called when a task finishes
Expand All @@ -842,8 +845,9 @@ function task_done_hook(t::Task)
end

if err && !handled && Threads.threadid() == 1
if isa(result, InterruptException) && isempty(Workqueue) && is_repl_running()
throwto_repl_task(result)
if isa(result, InterruptException) && isempty(Workqueue)
backend = repl_backend_task()
backend isa Task && throwto(backend, result)
end
end
# Clear sigatomic before waiting
Expand All @@ -854,11 +858,11 @@ function task_done_hook(t::Task)
# If an InterruptException happens while blocked in the event loop, try handing
# the exception to the REPL task since the current task is done.
# issue #19467
if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue) && is_repl_running()
throwto_repl_task(e)
else
rethrow()
if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue)
backend = repl_backend_task()
backend isa Task && throwto(backend, e)
end
rethrow() # this will terminate the program
end
end

Expand Down Expand Up @@ -1032,7 +1036,7 @@ function schedule(t::Task, @nospecialize(arg); error=false)
# schedule a task to be (re)started with the given value or exception
t._state === task_state_runnable || Base.error("schedule: Task not runnable")
if error
t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t)
q = t.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, t)
setfield!(t, :result, arg)
setfield!(t, :_isexception, true)
else
Expand All @@ -1056,7 +1060,7 @@ function yield()
try
wait()
catch
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
end
end
Expand Down
15 changes: 15 additions & 0 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) JL_NOTSAFEPOINT
return 0;
}

void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// equivalent to wake_thread, without the assert on wasrunning
int8_t state = jl_atomic_load_relaxed(&ptls->sleep_check_state);
if (state == sleeping) {
if (jl_atomic_cmpswap_relaxed(&ptls->sleep_check_state, &state, not_sleeping)) {
// this notification will never be consumed, so we may have now
// introduced some inaccuracy into the count, but that is
// unavoidable with any asynchronous interruption
jl_atomic_fetch_add_relaxed(&n_threads_running, 1);
}
}
}


static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
Expand Down
15 changes: 10 additions & 5 deletions src/signal-handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ void jl_show_sigill(void *_ctx)
#endif
}

void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT;

// make it invalid for a task to return from this point to its stack
// this is generally quite an foolish operation, but does free you up to do
// arbitrary things on this stack now without worrying about corrupt state that
Expand All @@ -439,15 +441,17 @@ void jl_task_frame_noreturn(jl_task_t *ct) JL_NOTSAFEPOINT
ct->eh = NULL;
ct->world_age = 1;
// Force all locks to drop. Is this a good idea? Of course not. But the alternative would probably deadlock instead of crashing.
small_arraylist_t *locks = &ct->ptls->locks;
jl_ptls_t ptls = ct->ptls;
small_arraylist_t *locks = &ptls->locks;
for (size_t i = locks->len; i > 0; i--)
jl_mutex_unlock_nogc((jl_mutex_t*)locks->items[i - 1]);
locks->len = 0;
ct->ptls->in_pure_callback = 0;
ct->ptls->in_finalizer = 0;
ct->ptls->defer_signal = 0;
ptls->in_pure_callback = 0;
ptls->in_finalizer = 0;
ptls->defer_signal = 0;
// forcibly exit GC (if we were in it) or safe into unsafe, without the mandatory safepoint
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_UNSAFE);
jl_atomic_store_release(&ptls->gc_state, JL_GC_STATE_UNSAFE);
surprise_wakeup(ptls);
// allow continuing to use a Task that should have already died--unsafe necromancy!
jl_atomic_store_relaxed(&ct->_state, JL_TASK_STATE_RUNNABLE);
}
Expand All @@ -461,6 +465,7 @@ void jl_critical_error(int sig, int si_code, bt_context_t *context, jl_task_t *c
size_t i, n = ct ? *bt_size : 0;
if (sig) {
// kill this task, so that we cannot get back to it accidentally (via an untimely ^C or jlbacktrace in jl_exit)
// and also resets the state of ct and ptls so that some code can run on this task again
jl_task_frame_noreturn(ct);
#ifndef _OS_WINDOWS_
sigset_t sset;
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Sockets/src/Sockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg)
finally
Base.sigatomic_end()
iolock_begin()
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(uvw) != C_NULL
# uvw is still alive,
# so make sure we won't get spurious notifications later
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Sockets/src/addrinfo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ function getalladdrinfo(host::String)
finally
Base.sigatomic_end()
iolock_begin()
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we don't get spurious notifications later
Expand Down Expand Up @@ -223,7 +223,7 @@ function getnameinfo(address::Union{IPv4, IPv6})
finally
Base.sigatomic_end()
iolock_begin()
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we don't get spurious notifications later
Expand Down

0 comments on commit 67c1723

Please sign in to comment.