From ea9d1c2f16dc8a297479c328d781bca5a4a9137d Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Mon, 19 Aug 2024 13:17:44 -0300 Subject: [PATCH 1/6] Fix external IO loop thead interaction and add function to Base.Experimental to facilitate it's use. Also add a test. --- base/experimental.jl | 24 +++++++++++++++++++++++ src/scheduler.c | 2 +- test/threads.jl | 46 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/base/experimental.jl b/base/experimental.jl index 58c7258120f3f..0409e5f545183 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -457,4 +457,28 @@ without adding them to the global method table. """ :@MethodTable +""" + Base.Experimental.make_io_thread() + +Create a new thread that will run the Julia IO loop. This can potentially reduce the latency of some +IO operations as they no longer depend on the main thread to run it. This does mean that code that uses +this as implicit synchronization needs to be checked for correctness. +""" +function make_io_thread() + tid = UInt[0] + threadwork = @cfunction function(arg::Ptr{Cvoid}) + current_task().donenotify = Base.ThreadSynchronizer() #TODO: Should this happen by default in adopt thread? + Base.errormonitor(current_task()) # this may not go particularly well if the IO loop is dead, but try anyways + @ccall jl_set_io_loop_tid((Threads.threadid() - 1)::Int16)::Cvoid + wait() # spin uv_run as long as needed + nothing + end Cvoid (Ptr{Cvoid},) + err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, C_NULL::Ptr{Cvoid})::Cint + err == 0 || Base.uv_error("uv_thread_create", err) + @ccall uv_thread_detach(tid::Ptr{UInt})::Cint + err == 0 || Base.uv_error("uv_thread_detach", err) + # n.b. this does not wait for the thread to start or to take ownership of the event loop + nothing +end + end diff --git a/src/scheduler.c b/src/scheduler.c index 3cf97ba108873..3287b66edddfa 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -447,7 +447,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // responsibility, so need to make sure thread 0 will take care // of us. if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock - jl_wakeup_thread(0); + jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid)); } if (uvlock) { int enter_eventloop = may_sleep(ptls); diff --git a/test/threads.jl b/test/threads.jl index 7b4558091022b..3e2f053326cdc 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -366,3 +366,49 @@ end @test jl_setaffinity(0, mask, cpumasksize) == 0 end end + +@testset "io_thread" begin + function io_thread_test() + # This test creates a thread that does IO and then blocks the main julia thread + # This test hangs if you don't spawn an IO thread. + # It hanging or not is technically a race but I haven't seen julia win that race yet. + cmd = """ + Base.Experimental.make_io_thread() + function callback()::Cvoid + println("Running a command") + run(`echo 42`) + return + end + function call_on_thread(callback::Ptr{Nothing}) + tid = UInt[0] + threadwork = @cfunction function(arg::Ptr{Cvoid}) + current_task().donenotify = Base.ThreadSynchronizer() + Base.errormonitor(current_task()) + println("Calling Julia from thread") + ccall(arg, Cvoid, ()) + nothing + end Cvoid (Ptr{Cvoid},) + err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, callback::Ptr{Cvoid})::Cint + err == 0 || Base.uv_error("uv_thread_create", err) + err = @ccall uv_thread_join(tid::Ptr{UInt})::Cint + err == 0 || Base.uv_error("uv_thread_join", err) + return + end + function main() + callback_ptr = @cfunction(callback, Cvoid, ()) + call_on_thread(callback_ptr) + println("Done") + end + main() + + """ + proc = run(pipeline(`$(Base.julia_cmd()) -e $cmd`), wait=false) + sleep(2) # Is there a better way to do this? + if process_running(proc) + kill(proc) + throw(ErrorException("Process did not exit in time")) + end + return true + end + @test io_thread_test() +end From 26a234fd8786c117e39d6bbd650faf079c98acbb Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Mon, 19 Aug 2024 14:26:54 -0300 Subject: [PATCH 2/6] Initialize task lock in adopt thread --- base/task.jl | 5 +++++ src/threading.c | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/base/task.jl b/base/task.jl index 5e4af6747f128..cf55b7679622a 100644 --- a/base/task.jl +++ b/base/task.jl @@ -866,6 +866,11 @@ function task_done_hook(t::Task) end end +function init_task_lock(t::Task) # Function only called from jl_adopt_thread so foreign tasks have a lock. + if t.donenotify === nothing + t.donenotify = ThreadSynchronizer() + end +end ## scheduler and work queue diff --git a/src/threading.c b/src/threading.c index 8f37ee814056c..477070b1b2b12 100644 --- a/src/threading.c +++ b/src/threading.c @@ -413,6 +413,28 @@ jl_ptls_t jl_init_threadtls(int16_t tid) return ptls; } +static _Atomic(jl_function_t*) init_task_lock_func JL_GLOBALLY_ROOTED = NULL; + +static void jl_init_task_lock(jl_task_t *ct) +{ + jl_function_t *done = jl_atomic_load_relaxed(&init_task_lock_func); + if (done == NULL) { + done = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("init_task_lock")); + if (done != NULL) + jl_atomic_store_release(&init_task_lock_func, done); + } + if (done != NULL) { + jl_value_t *args[2] = {done, (jl_value_t*)ct}; + JL_TRY { + jl_apply(args, 2); + } + JL_CATCH { + jl_no_exc_handler(jl_current_exception(ct), ct); + } + } +} + + JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void) { // `jl_init_threadtls` puts us in a GC unsafe region, so ensure GC isn't running. @@ -435,6 +457,7 @@ JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void) JL_GC_PROMISE_ROOTED(ct); uv_random(NULL, NULL, &ct->rngState, sizeof(ct->rngState), 0, NULL); jl_atomic_fetch_add(&jl_gc_disable_counter, -1); + jl_init_task_lock(ct); return &ct->gcstack; } From f107c6b85f3b91454d473391a59d006792d04e2f Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Tue, 20 Aug 2024 10:37:15 -0300 Subject: [PATCH 3/6] Make test GC safe --- test/threads.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/threads.jl b/test/threads.jl index 3e2f053326cdc..11e5b28a71b31 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -390,7 +390,9 @@ end end Cvoid (Ptr{Cvoid},) err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, callback::Ptr{Cvoid})::Cint err == 0 || Base.uv_error("uv_thread_create", err) + gc_state = @ccall jl_gc_safe_enter()::Int8 err = @ccall uv_thread_join(tid::Ptr{UInt})::Cint + @ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid err == 0 || Base.uv_error("uv_thread_join", err) return end From 05384ce586e23162f69d2004deca36086f38e35f Mon Sep 17 00:00:00 2001 From: gbaraldi Date: Fri, 30 Aug 2024 10:24:58 -0300 Subject: [PATCH 4/6] Initialize world counter in adopted threads root task --- src/threading.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/threading.c b/src/threading.c index aedd680194eb0..b41f49cb77988 100644 --- a/src/threading.c +++ b/src/threading.c @@ -423,6 +423,7 @@ JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void) JL_GC_PROMISE_ROOTED(ct); uv_random(NULL, NULL, &ct->rngState, sizeof(ct->rngState), 0, NULL); jl_atomic_fetch_add(&jl_gc_disable_counter, -1); + ct->world_age = jl_get_world_counter(); // root_task sets world_age to 1 jl_init_task_lock(ct); return &ct->gcstack; } From f9e99b5d019c3c1d265fc31e0d75d63722e72baf Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Mon, 16 Sep 2024 17:20:59 -0300 Subject: [PATCH 5/6] Increase timeout of test to 10 seconds --- test/threads.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/threads.jl b/test/threads.jl index a696d94bed953..392a5eaa0fe80 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -398,7 +398,7 @@ end """ proc = run(pipeline(`$(Base.julia_cmd()) -e $cmd`), wait=false) - sleep(2) # Is there a better way to do this? + sleep(10) # Is there a better way to do this? if process_running(proc) kill(proc) throw(ErrorException("Process did not exit in time")) From 9765da8b74cc545aeae4d565ae3f7c07b026c2fb Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Tue, 15 Oct 2024 15:56:45 -0300 Subject: [PATCH 6/6] Apply suggestions from code review --- test/threads.jl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/threads.jl b/test/threads.jl index 392a5eaa0fe80..f41aa90f54216 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -398,11 +398,9 @@ end """ proc = run(pipeline(`$(Base.julia_cmd()) -e $cmd`), wait=false) - sleep(10) # Is there a better way to do this? - if process_running(proc) - kill(proc) - throw(ErrorException("Process did not exit in time")) - end + t = Timer(60) do t; kill(proc); end; + @test success(proc) + close(t) return true end @test io_thread_test()