Skip to content

Commit

Permalink
Fix external IO loop thead interaction and add function to Base.Exper…
Browse files Browse the repository at this point in the history
…imental to facilitate it's use. Also add a test. (#55529)

While looking at #55525 I found
that the implementation wasn't working correctly.
I added it to Base.Experimental so people don't need to handroll their
own and am also testing a version of what the issue was hitting.
  • Loading branch information
gbaraldi authored Nov 7, 2024
1 parent 9e14bf8 commit 5848445
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 1 deletion.
23 changes: 23 additions & 0 deletions base/experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,29 @@ without adding them to the global method table.
"""
:@MethodTable

"""
Base.Experimental.make_io_thread()
Create a new thread that will run the Julia IO loop. This can potentially reduce the latency of some
IO operations as they no longer depend on the main thread to run it. This does mean that code that uses
this as implicit synchronization needs to be checked for correctness.
"""
function make_io_thread()
tid = UInt[0]
threadwork = @cfunction function(arg::Ptr{Cvoid})
current_task().donenotify = Base.ThreadSynchronizer() #TODO: Should this happen by default in adopt thread?
Base.errormonitor(current_task()) # this may not go particularly well if the IO loop is dead, but try anyways
@ccall jl_set_io_loop_tid((Threads.threadid() - 1)::Int16)::Cvoid
wait() # spin uv_run as long as needed
nothing
end Cvoid (Ptr{Cvoid},)
err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, C_NULL::Ptr{Cvoid})::Cint
err == 0 || Base.uv_error("uv_thread_create", err)
@ccall uv_thread_detach(tid::Ptr{UInt})::Cint
err == 0 || Base.uv_error("uv_thread_detach", err)
# n.b. this does not wait for the thread to start or to take ownership of the event loop
end

"""
Base.Experimental.entrypoint(f, argtypes::Tuple)
Expand Down
5 changes: 5 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,11 @@ function task_done_hook(t::Task)
end
end

function init_task_lock(t::Task) # Function only called from jl_adopt_thread so foreign tasks have a lock.
if t.donenotify === nothing
t.donenotify = ThreadSynchronizer()
end
end

## scheduler and work queue

Expand Down
3 changes: 2 additions & 1 deletion src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// responsibility, so need to make sure thread 0 will take care
// of us.
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
wakeup_thread(ct, 0);
jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid));

}
if (uvlock) {
int enter_eventloop = may_sleep(ptls);
Expand Down
24 changes: 24 additions & 0 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,28 @@ jl_ptls_t jl_init_threadtls(int16_t tid)
return ptls;
}

static _Atomic(jl_function_t*) init_task_lock_func JL_GLOBALLY_ROOTED = NULL;

static void jl_init_task_lock(jl_task_t *ct)
{
jl_function_t *done = jl_atomic_load_relaxed(&init_task_lock_func);
if (done == NULL) {
done = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("init_task_lock"));
if (done != NULL)
jl_atomic_store_release(&init_task_lock_func, done);
}
if (done != NULL) {
jl_value_t *args[2] = {done, (jl_value_t*)ct};
JL_TRY {
jl_apply(args, 2);
}
JL_CATCH {
jl_no_exc_handler(jl_current_exception(ct), ct);
}
}
}


JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void)
{
// `jl_init_threadtls` puts us in a GC unsafe region, so ensure GC isn't running.
Expand All @@ -423,6 +445,8 @@ JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void)
JL_GC_PROMISE_ROOTED(ct);
uv_random(NULL, NULL, &ct->rngState, sizeof(ct->rngState), 0, NULL);
jl_atomic_fetch_add(&jl_gc_disable_counter, -1);
ct->world_age = jl_get_world_counter(); // root_task sets world_age to 1
jl_init_task_lock(ct);
return &ct->gcstack;
}

Expand Down
46 changes: 46 additions & 0 deletions test/threads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,52 @@ end
end
end

@testset "io_thread" begin
function io_thread_test()
# This test creates a thread that does IO and then blocks the main julia thread
# This test hangs if you don't spawn an IO thread.
# It hanging or not is technically a race but I haven't seen julia win that race yet.
cmd = """
Base.Experimental.make_io_thread()
function callback()::Cvoid
println("Running a command")
run(`echo 42`)
return
end
function call_on_thread(callback::Ptr{Nothing})
tid = UInt[0]
threadwork = @cfunction function(arg::Ptr{Cvoid})
current_task().donenotify = Base.ThreadSynchronizer()
Base.errormonitor(current_task())
println("Calling Julia from thread")
ccall(arg, Cvoid, ())
nothing
end Cvoid (Ptr{Cvoid},)
err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, callback::Ptr{Cvoid})::Cint
err == 0 || Base.uv_error("uv_thread_create", err)
gc_state = @ccall jl_gc_safe_enter()::Int8
err = @ccall uv_thread_join(tid::Ptr{UInt})::Cint
@ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid
err == 0 || Base.uv_error("uv_thread_join", err)
return
end
function main()
callback_ptr = @cfunction(callback, Cvoid, ())
call_on_thread(callback_ptr)
println("Done")
end
main()
"""
proc = run(pipeline(`$(Base.julia_cmd()) -e $cmd`), wait=false)
t = Timer(60) do t; kill(proc); end;
@test success(proc)
close(t)
return true
end
@test io_thread_test()
end

# Make sure default number of BLAS threads respects CPU affinity: issue #55572.
@testset "LinearAlgebra number of default threads" begin
if AFFINITY_SUPPORTED
Expand Down

0 comments on commit 5848445

Please sign in to comment.