Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix external IO loop thead interaction and add function to Base.Experimental to facilitate it's use. Also add a test. #55529

Merged
merged 10 commits into from
Nov 7, 2024
24 changes: 24 additions & 0 deletions base/experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,28 @@ without adding them to the global method table.
"""
:@MethodTable

"""
Base.Experimental.make_io_thread()

Create a new thread that will run the Julia IO loop. This can potentially reduce the latency of some
IO operations as they no longer depend on the main thread to run it. This does mean that code that uses
this as implicit synchronization needs to be checked for correctness.
"""
function make_io_thread()
tid = UInt[0]
threadwork = @cfunction function(arg::Ptr{Cvoid})
current_task().donenotify = Base.ThreadSynchronizer() #TODO: Should this happen by default in adopt thread?
Base.errormonitor(current_task()) # this may not go particularly well if the IO loop is dead, but try anyways
@ccall jl_set_io_loop_tid((Threads.threadid() - 1)::Int16)::Cvoid
wait() # spin uv_run as long as needed
nothing
end Cvoid (Ptr{Cvoid},)
err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, C_NULL::Ptr{Cvoid})::Cint
err == 0 || Base.uv_error("uv_thread_create", err)
@ccall uv_thread_detach(tid::Ptr{UInt})::Cint
err == 0 || Base.uv_error("uv_thread_detach", err)
# n.b. this does not wait for the thread to start or to take ownership of the event loop
nothing
end

end
5 changes: 5 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,11 @@ function task_done_hook(t::Task)
end
end

function init_task_lock(t::Task) # Function only called from jl_adopt_thread so foreign tasks have a lock.
if t.donenotify === nothing
t.donenotify = ThreadSynchronizer()
end
end

## scheduler and work queue

Expand Down
3 changes: 2 additions & 1 deletion src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
wakeup_thread(ct, 0);
jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid));

}
if (uvlock) {
int enter_eventloop = may_sleep(ptls);
Expand Down
24 changes: 24 additions & 0 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,28 @@ jl_ptls_t jl_init_threadtls(int16_t tid)
return ptls;
}

static _Atomic(jl_function_t*) init_task_lock_func JL_GLOBALLY_ROOTED = NULL;

static void jl_init_task_lock(jl_task_t *ct)
{
jl_function_t *done = jl_atomic_load_relaxed(&init_task_lock_func);
if (done == NULL) {
done = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("init_task_lock"));
if (done != NULL)
jl_atomic_store_release(&init_task_lock_func, done);
}
if (done != NULL) {
jl_value_t *args[2] = {done, (jl_value_t*)ct};
JL_TRY {
jl_apply(args, 2);
}
JL_CATCH {
jl_no_exc_handler(jl_current_exception(ct), ct);
}
}
}


JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void)
{
// `jl_init_threadtls` puts us in a GC unsafe region, so ensure GC isn't running.
Expand All @@ -401,6 +423,8 @@ JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void)
JL_GC_PROMISE_ROOTED(ct);
uv_random(NULL, NULL, &ct->rngState, sizeof(ct->rngState), 0, NULL);
jl_atomic_fetch_add(&jl_gc_disable_counter, -1);
ct->world_age = jl_get_world_counter(); // root_task sets world_age to 1
vtjnash marked this conversation as resolved.
Show resolved Hide resolved
jl_init_task_lock(ct);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed for all adopted threads? Could we lazy initialize it when somebody tries to wait for the task instead?

return &ct->gcstack;
}

Expand Down
48 changes: 48 additions & 0 deletions test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,51 @@ end
@test jl_setaffinity(0, mask, cpumasksize) == 0
end
end

@testset "io_thread" begin
function io_thread_test()
# This test creates a thread that does IO and then blocks the main julia thread
# This test hangs if you don't spawn an IO thread.
# It hanging or not is technically a race but I haven't seen julia win that race yet.
cmd = """
Base.Experimental.make_io_thread()
function callback()::Cvoid
println("Running a command")
run(`echo 42`)
return
end
function call_on_thread(callback::Ptr{Nothing})
tid = UInt[0]
threadwork = @cfunction function(arg::Ptr{Cvoid})
current_task().donenotify = Base.ThreadSynchronizer()
Base.errormonitor(current_task())
println("Calling Julia from thread")
ccall(arg, Cvoid, ())
nothing
end Cvoid (Ptr{Cvoid},)
err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, callback::Ptr{Cvoid})::Cint
err == 0 || Base.uv_error("uv_thread_create", err)
gc_state = @ccall jl_gc_safe_enter()::Int8
err = @ccall uv_thread_join(tid::Ptr{UInt})::Cint
vtjnash marked this conversation as resolved.
Show resolved Hide resolved
@ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid
err == 0 || Base.uv_error("uv_thread_join", err)
return
end
function main()
callback_ptr = @cfunction(callback, Cvoid, ())
call_on_thread(callback_ptr)
println("Done")
end
main()

"""
proc = run(pipeline(`$(Base.julia_cmd()) -e $cmd`), wait=false)
sleep(2) # Is there a better way to do this?
gbaraldi marked this conversation as resolved.
Show resolved Hide resolved
if process_running(proc)
kill(proc)
throw(ErrorException("Process did not exit in time"))
end
return true
end
@test io_thread_test()
end