diff --git a/base/experimental.jl b/base/experimental.jl index 982ed5e78aa8c..31238d4015b3b 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -457,6 +457,29 @@ without adding them to the global method table. """ :@MethodTable +""" + Base.Experimental.make_io_thread() + +Create a new thread that will run the Julia IO loop. This can potentially reduce the latency of some +IO operations as they no longer depend on the main thread to run it. This does mean that code that uses +this as implicit synchronization needs to be checked for correctness. +""" +function make_io_thread() + tid = UInt[0] + threadwork = @cfunction function(arg::Ptr{Cvoid}) + current_task().donenotify = Base.ThreadSynchronizer() #TODO: Should this happen by default in adopt thread? + Base.errormonitor(current_task()) # this may not go particularly well if the IO loop is dead, but try anyways + @ccall jl_set_io_loop_tid((Threads.threadid() - 1)::Int16)::Cvoid + wait() # spin uv_run as long as needed + nothing + end Cvoid (Ptr{Cvoid},) + err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, C_NULL::Ptr{Cvoid})::Cint + err == 0 || Base.uv_error("uv_thread_create", err) + @ccall uv_thread_detach(tid::Ptr{UInt})::Cint + err == 0 || Base.uv_error("uv_thread_detach", err) + # n.b. this does not wait for the thread to start or to take ownership of the event loop +end + """ Base.Experimental.entrypoint(f, argtypes::Tuple) diff --git a/base/task.jl b/base/task.jl index f3a134f374421..2a922c4b85f24 100644 --- a/base/task.jl +++ b/base/task.jl @@ -849,6 +849,11 @@ function task_done_hook(t::Task) end end +function init_task_lock(t::Task) # Function only called from jl_adopt_thread so foreign tasks have a lock. + if t.donenotify === nothing + t.donenotify = ThreadSynchronizer() + end +end ## scheduler and work queue diff --git a/src/scheduler.c b/src/scheduler.c index fff891d91a813..731a0c5146605 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -437,7 +437,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, // responsibility, so need to make sure thread 0 will take care // of us. if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock - wakeup_thread(ct, 0); + jl_wakeup_thread(jl_atomic_load_relaxed(&io_loop_tid)); + } if (uvlock) { int enter_eventloop = may_sleep(ptls); diff --git a/src/threading.c b/src/threading.c index 50944a24eb29b..42174830d9b43 100644 --- a/src/threading.c +++ b/src/threading.c @@ -401,6 +401,28 @@ jl_ptls_t jl_init_threadtls(int16_t tid) return ptls; } +static _Atomic(jl_function_t*) init_task_lock_func JL_GLOBALLY_ROOTED = NULL; + +static void jl_init_task_lock(jl_task_t *ct) +{ + jl_function_t *done = jl_atomic_load_relaxed(&init_task_lock_func); + if (done == NULL) { + done = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("init_task_lock")); + if (done != NULL) + jl_atomic_store_release(&init_task_lock_func, done); + } + if (done != NULL) { + jl_value_t *args[2] = {done, (jl_value_t*)ct}; + JL_TRY { + jl_apply(args, 2); + } + JL_CATCH { + jl_no_exc_handler(jl_current_exception(ct), ct); + } + } +} + + JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void) { // `jl_init_threadtls` puts us in a GC unsafe region, so ensure GC isn't running. @@ -423,6 +445,8 @@ JL_DLLEXPORT jl_gcframe_t **jl_adopt_thread(void) JL_GC_PROMISE_ROOTED(ct); uv_random(NULL, NULL, &ct->rngState, sizeof(ct->rngState), 0, NULL); jl_atomic_fetch_add(&jl_gc_disable_counter, -1); + ct->world_age = jl_get_world_counter(); // root_task sets world_age to 1 + jl_init_task_lock(ct); return &ct->gcstack; } diff --git a/test/threads.jl b/test/threads.jl index d5a801c1a6a1c..4d928ca05da16 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -360,6 +360,52 @@ end end end +@testset "io_thread" begin + function io_thread_test() + # This test creates a thread that does IO and then blocks the main julia thread + # This test hangs if you don't spawn an IO thread. + # It hanging or not is technically a race but I haven't seen julia win that race yet. + cmd = """ + Base.Experimental.make_io_thread() + function callback()::Cvoid + println("Running a command") + run(`echo 42`) + return + end + function call_on_thread(callback::Ptr{Nothing}) + tid = UInt[0] + threadwork = @cfunction function(arg::Ptr{Cvoid}) + current_task().donenotify = Base.ThreadSynchronizer() + Base.errormonitor(current_task()) + println("Calling Julia from thread") + ccall(arg, Cvoid, ()) + nothing + end Cvoid (Ptr{Cvoid},) + err = @ccall uv_thread_create(tid::Ptr{UInt}, threadwork::Ptr{Cvoid}, callback::Ptr{Cvoid})::Cint + err == 0 || Base.uv_error("uv_thread_create", err) + gc_state = @ccall jl_gc_safe_enter()::Int8 + err = @ccall uv_thread_join(tid::Ptr{UInt})::Cint + @ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid + err == 0 || Base.uv_error("uv_thread_join", err) + return + end + function main() + callback_ptr = @cfunction(callback, Cvoid, ()) + call_on_thread(callback_ptr) + println("Done") + end + main() + + """ + proc = run(pipeline(`$(Base.julia_cmd()) -e $cmd`), wait=false) + t = Timer(60) do t; kill(proc); end; + @test success(proc) + close(t) + return true + end + @test io_thread_test() +end + # Make sure default number of BLAS threads respects CPU affinity: issue #55572. @testset "LinearAlgebra number of default threads" begin if AFFINITY_SUPPORTED