From 2b61c72a5be37a83b6db5a8a1a42b1e8e9237a7f Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Thu, 31 Oct 2024 12:35:30 +0000 Subject: [PATCH] Add per-Task cpu time metric initialize to zero make new task field visible from Julia add start time fix typo add test add a wait import `LinearAlgebra` for `peakflops` update last scheduled on task finish as well no need to update last scheduled in finish add test fix test Add per-task wall-time Test individual task cpu_time less than wall_time More tests Move task timings mostly to julia Record cpu time on entrance to `wait/yield` Rename 'scheduled_at' -> '[en/de]queued_at' Ability to enable/disable task timings globally fixup whitespace Enable task timing in test Enable task timing for root task Prevent `task_timing(false)` decrementing below zero Change task-timing flag to require yes/no arg Rename `timings`->`metrics`, `dequeued`->`started_running` Update task-state-transition comments Remove unused debug function Add NEWS Add fallback recording of cpu time in `wait()` Instrument async io Record task cpu time in `wait()` Make task metrics flag const and counters atomic Mark the new APIs experimental Return task metrics as Int or nothing if disabled Test metrics updating as expected More tests --- base/experimental.jl | 70 +++++++ base/options.jl | 1 + base/task.jl | 52 +++++- doc/man/julia.1 | 4 +- doc/src/manual/command-line-interface.md | 1 + src/init.c | 4 + src/jlapi.c | 22 +++ src/jloptions.c | 14 +- src/jloptions.h | 1 + src/jltypes.c | 24 ++- src/julia.h | 13 ++ src/julia_internal.h | 3 + src/task.c | 29 +++ src/threading.c | 2 + test/cmdlineargs.jl | 9 + test/threads_exec.jl | 227 +++++++++++++++++++++++ 16 files changed, 468 insertions(+), 8 deletions(-) diff --git a/base/experimental.jl b/base/experimental.jl index cc8d368023b492..2d761c8ee5ec5c 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -368,4 +368,74 @@ adding them to the global method table. """ :@MethodTable +### Task metrics + +""" + Base.Experimental.task_metrics(::Bool) + +Enable or disable the collection of per-task metrics. +A `Task` created when `Base.task_metrics(true)` is in effect will have +[`Base.Experimental.task_cpu_time_ns`](@ref) and [`Base.Experimental.task_wall_time_ns`](@ref) +timing information available. + +!!! note + Task metrics can be enabled at start-up via the `--task-metrics=yes` command line option. +""" +function task_metrics(b::Bool) + if b + ccall(:jl_task_metrics_enable, Cvoid, ()) + else + ccall(:jl_task_metrics_disable, Cvoid, ()) + end + return nothing +end + +""" + Base.Experimental.task_cpu_time_ns(t::Task) -> Union{Int, Nothing} + +Return the total nanoseconds that the task `t` has spent running. +See also [`Base.Experimental.task_wall_time_ns`](@ref). + +Will be `nothing` if task timings are not enabled. +See [`Base.Experimental.task_metrics`](@ref). + +!!! note "This metric is from the Julia scheduler" + A task may be running on an OS thread that is descheduled by the OS + scheduler, this time still counts towards the metric. + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_cpu_time_ns(t::Task) + t.metrics_enabled || return nothing + if t.last_started_running_at == 0 + return Int(t.cpu_time_ns) + else + return Int(t.cpu_time_ns + (time_ns() - t.last_started_running_at)) + end end + +""" + Base.Experimental.task_wall_time_ns(t::Task) -> Union{Int, Nothing} + +Return the total nanoseconds that the task `t` was runnable. +This is the time since the task entered the run queue until the time at which it completed, +or until the current time if the task has not yet completed. +See also [`task_cpu_time_ns`](@ref). + +Will be `nothing` if task timings are not enabled. +See [`Base.task_metrics`](@ref). + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_wall_time_ns(t::Task) + t.metrics_enabled || return nothing + start_at = t.first_enqueued_at + start_at == 0 && return 0 + end_at = t.finished_at + end_at == 0 && return Int(time_ns() - start_at) + return Int(end_at - start_at) +end + +end # module diff --git a/base/options.jl b/base/options.jl index a01a3f553b157c..2e779c964e7fd9 100644 --- a/base/options.jl +++ b/base/options.jl @@ -60,6 +60,7 @@ struct JLOptions heap_size_hint::UInt64 trace_compile_timing::Int8 safe_crash_log_file::Ptr{UInt8} + task_metrics::Int8 end # This runs early in the sysimage != is not defined yet diff --git a/base/task.jl b/base/task.jl index 137b0f7c4a3f66..ac5580c5b000d2 100644 --- a/base/task.jl +++ b/base/task.jl @@ -810,7 +810,11 @@ function enq_work(t::Task) return t end -schedule(t::Task) = enq_work(t) +function schedule(t::Task) + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) + enq_work(t) +end """ schedule(t::Task, [val]; error=false) @@ -857,6 +861,8 @@ function schedule(t::Task, @nospecialize(arg); error=false) t.queue === nothing || Base.error("schedule: Task not runnable") setfield!(t, :result, arg) end + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) enq_work(t) return t end @@ -888,7 +894,13 @@ A fast, unfair-scheduling version of `schedule(t, arg); yield()` which immediately yields to `t` before calling the scheduler. """ function yield(t::Task, @nospecialize(x=nothing)) - (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable") + current = current_task() + # [task] user_time -yield-> wait_time + record_cpu_time!(current) + t === current && throw(ConcurrencyViolationError("Cannot yield to currently running task!")) + (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable")) + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) t.result = x enq_work(current_task()) set_next_task(t) @@ -904,6 +916,8 @@ call to `yieldto`. This is a low-level call that only switches tasks, not consid or scheduling in any way. Its use is discouraged. """ function yieldto(t::Task, @nospecialize(x=nothing)) + # [task] user_time -yield-> wait_time + record_cpu_time!(current_task()) # TODO: these are legacy behaviors; these should perhaps be a scheduler # state error instead. if t._state === task_state_done @@ -911,6 +925,8 @@ function yieldto(t::Task, @nospecialize(x=nothing)) elseif t._state === task_state_failed throw(t.result) end + # [task] created -scheduled-unfairly-> wait_time + maybe_record_enqueued!(t) t.result = x set_next_task(t) return try_yieldto(identity) @@ -924,6 +940,11 @@ function try_yieldto(undo) rethrow() end ct = current_task() + # [task] wait_time -(re)started-> user_time + if ct.metrics_enabled + @assert ct.last_started_running_at == 0 + @atomic :monotonic ct.last_started_running_at = time_ns() + end if ct._isexception exc = ct.result ct.result = nothing @@ -937,6 +958,10 @@ end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) + # [task] user_time -yield-> wait_time + record_cpu_time!(current_task()) + # [task] created -scheduled-unfairly-> wait_time + maybe_record_enqueued!(t) t.result = exc t._isexception = true set_next_task(t) @@ -989,6 +1014,8 @@ checktaskempty = Partr.multiq_check_empty end function wait() + # [task] user_time -yield-or-done-> wait_time + record_cpu_time!(current_task()) GC.safepoint() W = workqueue_for(Threads.threadid()) poptask(W) @@ -1003,3 +1030,24 @@ if Sys.iswindows() else pause() = ccall(:pause, Cvoid, ()) end + +# update the `cpu_time_ns` field of `t` to include the time since it last started running. +function record_cpu_time!(t::Task) + if t.metrics_enabled + stopped_at = t.finished_at == 0 ? time_ns() : t.finished_at + @assert t.last_started_running_at != 0 + @atomic :monotonic t.cpu_time_ns += stopped_at - t.last_started_running_at + # set to 0 to indicate that the task is not running + @atomic :monotonic t.last_started_running_at = 0 + end + return t +end +# if this is the first time `t` has been added to the run queue +# (or the first time it has been unfairly yielded to without being added to the run queue) +# then set the `first_enqueued_at` field to the current time. +function maybe_record_enqueued!(t::Task) + if t.metrics_enabled && t.first_enqueued_at == 0 + @atomic :monotonic t.first_enqueued_at = time_ns() + end + return t +end diff --git a/doc/man/julia.1 b/doc/man/julia.1 index e7da6e352a96a1..aef5d9be325754 100644 --- a/doc/man/julia.1 +++ b/doc/man/julia.1 @@ -271,6 +271,9 @@ Print precompile statements for methods compiled during execution or save to a p If --trace-compile is enabled show how long each took to compile in ms .TP +--task-metrics={yes|no*} +Enable the collection of per-task metrics. + -image-codegen Force generate code in imaging mode @@ -281,6 +284,5 @@ See https://docs.julialang.org/en/v1/manual/environment-variables/ Please report any bugs using the GitHub issue tracker: https://github.com/julialang/julia/issues?state=open - .SH AUTHORS Contributors: https://github.com/JuliaLang/julia/graphs/contributors diff --git a/doc/src/manual/command-line-interface.md b/doc/src/manual/command-line-interface.md index d08e3fbfd2181a..0c380fa2c0688b 100644 --- a/doc/src/manual/command-line-interface.md +++ b/doc/src/manual/command-line-interface.md @@ -130,6 +130,7 @@ The following is a complete list of command-line switches available when launchi |`--code-coverage=tracefile.info` |Append coverage information to the LCOV tracefile (filename supports format tokens).| |`--track-allocation[={none*\|user\|all}]` |Count bytes allocated by each source line (omitting setting is equivalent to "user")| |`--track-allocation=@` |Count bytes but only in files that fall under the given file path/directory. The `@` prefix is required to select this option. A `@` with no path will track the current directory.| +|`--task-metrics={yes\|no*}` |Enable the collection of per-task metrics| |`--bug-report=KIND` |Launch a bug report session. It can be used to start a REPL, run a script, or evaluate expressions. It first tries to use BugReporting.jl installed in current environment and falls back to the latest compatible BugReporting.jl if not. For more information, see `--bug-report=help`.| |`--compile={yes*\|no\|all\|min}` |Enable or disable JIT compiler, or request exhaustive or minimal compilation| |`--output-o ` |Generate an object file (including system image data)| diff --git a/src/init.c b/src/init.c index 9a49ad60c02f4e..dae5afb3759163 100644 --- a/src/init.c +++ b/src/init.c @@ -851,6 +851,10 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) #if defined(_COMPILER_GCC_) && __GNUC__ >= 12 #pragma GCC diagnostic ignored "-Wdangling-pointer" #endif + if (jl_options.task_metrics == JL_OPTIONS_TASK_METRICS_ON) { + // enable before creating the root task so it gets timings too. + jl_atomic_fetch_add(&jl_task_metrics_enabled, 1); + } // warning: this changes `jl_current_task`, so be careful not to call that from this function jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi); #pragma GCC diagnostic pop diff --git a/src/jlapi.c b/src/jlapi.c index 0dffaac6272884..ef9ce4d2df2ae7 100644 --- a/src/jlapi.c +++ b/src/jlapi.c @@ -509,6 +509,28 @@ JL_DLLEXPORT uint64_t jl_cumulative_recompile_time_ns(void) return jl_atomic_load_relaxed(&jl_cumulative_recompile_time); } +/** + * @brief Enable per-task timing. + */ +JL_DLLEXPORT void jl_task_metrics_enable(void) +{ + // Increment the flag to allow reentrant callers. + jl_atomic_fetch_add(&jl_task_metrics_enabled, 1); +} + +/** + * @brief Disable per-task timing. + */ +JL_DLLEXPORT void jl_task_metrics_disable(void) +{ + // Prevent decrementing the counter below zero + uint8_t enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled); + while (enabled > 0) { + if (jl_atomic_cmpswap(&jl_task_metrics_enabled, &enabled, enabled-1)) + break; + } +} + JL_DLLEXPORT void jl_get_fenv_consts(int *ret) { ret[0] = FE_INEXACT; diff --git a/src/jloptions.c b/src/jloptions.c index 40fc3fe7f4ab50..919c8830556c3c 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -93,6 +93,7 @@ JL_DLLEXPORT void jl_init_options(void) 0, // heap-size-hint 0, // trace_compile_timing NULL, // safe_crash_log_file + 0, // task_metrics }; jl_options_initialized = 1; } @@ -206,7 +207,7 @@ static const char opts_hidden[] = " --strip-metadata Remove docstrings and source location info from system image\n" " --strip-ir Remove IR (intermediate representation) of compiled functions\n\n" - // compiler debugging (see the devdocs for tips on using these options) + // compiler debugging and experimental (see the devdocs for tips on using these options) " --output-unopt-bc Generate unoptimized LLVM bitcode (.bc)\n" " --output-bc Generate LLVM bitcode (.bc)\n" " --output-asm Generate an assembly file (.s)\n" @@ -215,6 +216,7 @@ static const char opts_hidden[] = " --trace-compile={stderr,name}\n" " Print precompile statements for methods compiled during execution or save to a path\n" " --trace-compile-timing If --trace-compile is enabled show how long each took to compile in ms\n" + " --task-metrics={yes|no*} Enable collection of per-task timing data.\n" " --image-codegen Force generate code in imaging mode\n" " --permalloc-pkgimg={yes|no*} Copy the data section of package images into memory\n" ; @@ -239,6 +241,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) opt_trace_compile, opt_trace_compile_timing, opt_trace_dispatch, + opt_task_metrics, opt_math_mode, opt_worker, opt_bind_to, @@ -316,6 +319,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) { "polly", required_argument, 0, opt_polly }, { "trace-compile", required_argument, 0, opt_trace_compile }, { "trace-dispatch", required_argument, 0, opt_trace_dispatch }, + { "task-metrics", required_argument, 0, opt_task_metrics }, { "math-mode", required_argument, 0, opt_math_mode }, { "handle-signals", required_argument, 0, opt_handle_signals }, // hidden command line options @@ -872,6 +876,14 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) if (jl_options.safe_crash_log_file == NULL) jl_error("julia: failed to allocate memory for --safe-crash-log-file"); break; + case opt_task_metrics: + if (!strcmp(optarg, "no")) + jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_OFF; + else if (!strcmp(optarg, "yes")) + jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_ON; + else + jl_errorf("julia: invalid argument to --task-metrics={yes|no} (%s)", optarg); + break; default: jl_errorf("julia: unhandled option -- %c\n" "This is a bug, please report it.", c); diff --git a/src/jloptions.h b/src/jloptions.h index 63f639805b3002..d9cf96e82f2a45 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -64,6 +64,7 @@ typedef struct { uint64_t heap_size_hint; int8_t trace_compile_timing; const char *safe_crash_log_file; + int8_t task_metrics; } jl_options_t; #endif diff --git a/src/jltypes.c b/src/jltypes.c index e9d843c383a9c0..1f0cdd53c424ac 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -3222,7 +3222,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(16, + jl_perm_symsvec(21, "next", "queue", "storage", @@ -3238,8 +3238,13 @@ void jl_init_types(void) JL_GC_DISABLED "_state", "sticky", "_isexception", - "priority"), - jl_svec(16, + "priority", + "metrics_enabled", + "first_enqueued_at", + "last_started_running_at", + "cpu_time_ns", + "finished_at"), + jl_svec(21, jl_any_type, jl_any_type, jl_any_type, @@ -3255,12 +3260,23 @@ void jl_init_types(void) JL_GC_DISABLED jl_uint8_type, jl_bool_type, jl_bool_type, - jl_uint16_type), + jl_uint16_type, + jl_bool_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type), jl_emptysvec, 0, 1, 6); XX(task); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type); jl_svecset(jl_task_type->types, 0, listt); + // Set field 17 (metrics_enabled) as const + // Set fields 13 (_state) and 18-21 (metric counters) as atomic + const static uint32_t task_constfields[1] = { 0b000010000000000000000 }; + const static uint32_t task_atomicfields[1] = { 0b111100001000000000000 }; + jl_task_type->name->constfields = task_constfields; + jl_task_type->name->atomicfields = task_atomicfields; jl_binding_type = jl_new_datatype(jl_symbol("Binding"), core, jl_any_type, jl_emptysvec, diff --git a/src/julia.h b/src/julia.h index f34bb1623eed50..b1128e599e66e4 100644 --- a/src/julia.h +++ b/src/julia.h @@ -2043,6 +2043,16 @@ typedef struct _jl_task_t { // uint8_t padding1; // multiqueue priority uint16_t priority; + // flag indicating whether or not to record timing metrics for this task + uint8_t metrics_enabled; + // timestamp this task first entered the run queue (TODO: int32 of ms instead?) + _Atomic(uint64_t) first_enqueued_at; + // timestamp this task was most recently scheduled to run + _Atomic(uint64_t) last_started_running_at; + // time this task has spent running; updated when it yields or finishes. + _Atomic(uint64_t) cpu_time_ns; + // timestamp this task finished (i.e. entered state DONE or FAILED). + _Atomic(uint64_t) finished_at; // hidden state: @@ -2324,6 +2334,9 @@ JL_DLLEXPORT int jl_generating_output(void) JL_NOTSAFEPOINT; #define JL_OPTIONS_USE_PKGIMAGES_YES 1 #define JL_OPTIONS_USE_PKGIMAGES_NO 0 +#define JL_OPTIONS_TASK_METRICS_OFF 0 +#define JL_OPTIONS_TASK_METRICS_ON 1 + // Version information #include // Generated file diff --git a/src/julia_internal.h b/src/julia_internal.h index fc719c45c57539..902e831e799879 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -303,6 +303,9 @@ extern JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time; +// Global *atomic* integer controlling *process-wide* task timing. +extern JL_DLLEXPORT _Atomic(uint8_t) jl_task_metrics_enabled; + #define jl_return_address() ((uintptr_t)__builtin_return_address(0)) STATIC_INLINE uint32_t jl_int32hash_fast(uint32_t a) diff --git a/src/task.c b/src/task.c index 86033a81ddf412..8a5cf5277ef64d 100644 --- a/src/task.c +++ b/src/task.c @@ -295,6 +295,11 @@ void JL_NORETURN jl_finish_task(jl_task_t *t) jl_task_t *ct = jl_current_task; JL_PROBE_RT_FINISH_TASK(ct); JL_SIGATOMIC_BEGIN(); + if (ct->metrics_enabled) { + // [task] user_time -finished-> wait_time + assert(jl_atomic_load_relaxed(&t->first_enqueued_at) != 0); + jl_atomic_store_relaxed(&t->finished_at, jl_hrtime()); + } if (jl_atomic_load_relaxed(&t->_isexception)) jl_atomic_store_release(&t->_state, JL_TASK_STATE_FAILED); else @@ -1084,6 +1089,11 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->ptls = NULL; t->world_age = ct->world_age; t->reentrant_timing = 0; + t->metrics_enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled) != 0; + jl_atomic_store_relaxed(&t->first_enqueued_at, 0); + jl_atomic_store_relaxed(&t->last_started_running_at, 0); + jl_atomic_store_relaxed(&t->cpu_time_ns, 0); + jl_atomic_store_relaxed(&t->finished_at, 0); jl_timing_task_init(t); #ifdef COPY_STACKS @@ -1220,6 +1230,12 @@ CFI_NORETURN #endif ct->started = 1; + if (ct->metrics_enabled) { + // [task] wait_time -started-> user_time + assert(jl_atomic_load_relaxed(&ct->first_enqueued_at) != 0); + assert(jl_atomic_load_relaxed(&ct->last_started_running_at) == 0); + jl_atomic_store_relaxed(&ct->last_started_running_at, jl_hrtime()); + } JL_PROBE_RT_START_TASK(ct); jl_timing_block_task_enter(ct, ptls, NULL); if (jl_atomic_load_relaxed(&ct->_isexception)) { @@ -1680,6 +1696,19 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) ct->ptls = ptls; ct->world_age = 1; // OK to run Julia code on this task ct->reentrant_timing = 0; + jl_atomic_store_relaxed(&ct->cpu_time_ns, 0); + jl_atomic_store_relaxed(&ct->finished_at, 0); + ct->metrics_enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled) != 0; + if (ct->metrics_enabled) { + // [task] created -started-> user_time + uint64_t now = jl_hrtime(); + jl_atomic_store_relaxed(&ct->first_enqueued_at, now); + jl_atomic_store_relaxed(&ct->last_started_running_at, now); + } + else { + jl_atomic_store_relaxed(&ct->first_enqueued_at, 0); + jl_atomic_store_relaxed(&ct->last_started_running_at, 0); + } ptls->root_task = ct; jl_atomic_store_relaxed(&ptls->current_task, ct); JL_GC_PROMISE_ROOTED(ct); diff --git a/src/threading.c b/src/threading.c index d0f25de374462c..5a8ae7825354f6 100644 --- a/src/threading.c +++ b/src/threading.c @@ -49,6 +49,8 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; +JL_DLLEXPORT _Atomic(uint8_t) jl_task_metrics_enabled = 0; + JL_DLLEXPORT void *jl_get_ptls_states(void) { // mostly deprecated: use current_task instead diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index 094fa8741087f2..3597d99eb6d965 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -746,6 +746,15 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` "Int(Base.JLOptions().fast_math)"`)) == JL_OPTIONS_FAST_MATH_DEFAULT end + let JL_OPTIONS_TASK_METRICS_OFF = 0, JL_OPTIONS_TASK_METRICS_ON = 1 + @test parse(Int,readchomp(`$exename -E + "Int(Base.JLOptions().task_metrics)"`)) == JL_OPTIONS_TASK_METRICS_OFF + @test parse(Int, readchomp(`$exename --task-metrics=yes -E + "Int(Base.JLOptions().task_metrics)"`)) == JL_OPTIONS_TASK_METRICS_ON + @test !parse(Bool, readchomp(`$exename -E "current_task().metrics_enabled"`)) + @test parse(Bool, readchomp(`$exename --task-metrics=yes -E "current_task().metrics_enabled"`)) + end + # --worker takes default / custom as argument (default/custom arguments # tested in test/parallel.jl) @test errors_not_signals(`$exename --worker=true`) diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 9c7c524febeff6..499d2f91eeca35 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -3,6 +3,7 @@ using Test using Base.Threads using Base.Threads: SpinLock, threadpoolsize +using LinearAlgebra: peakflops # for cfunction_closure include("testenv.jl") @@ -1090,4 +1091,230 @@ end end end +@testset "Base.Experimental.task_metrics" begin + t = Task(() -> nothing) + @test_throws "const field" t.metrics_enabled = true + is_task_metrics_enabled() = fetch(Threads.@spawn current_task().metrics_enabled) + @test !is_task_metrics_enabled() + try + @testset "once" begin + Base.Experimental.task_metrics(true) + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) + @test !is_task_metrics_enabled() + end + @testset "multiple" begin + Base.Experimental.task_metrics(true) # 1 + Base.Experimental.task_metrics(true) # 2 + Base.Experimental.task_metrics(true) # 3 + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) # 2 + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) # 1 + @test is_task_metrics_enabled() + @sync for i in 1:5 # 0 (not negative) + Threads.@spawn Base.Experimental.task_metrics(false) + end + @test !is_task_metrics_enabled() + Base.Experimental.task_metrics(true) # 1 + @test is_task_metrics_enabled() + end + finally + while is_task_metrics_enabled() + Base.Experimental.task_metrics(false) + end + end +end + +@testset "task time counters" begin + @testset "enabled" begin + try + Base.Experimental.task_metrics(true) + start_time = time_ns() + t = Threads.@spawn peakflops() + wait(t) + end_time = time_ns() + wall_time_delta = end_time - start_time + @test t.metrics_enabled + @test Base.Experimental.task_cpu_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) >= Base.Experimental.task_cpu_time_ns(t) + @test wall_time_delta > Base.Experimental.task_wall_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end + @testset "disabled" begin + t = Threads.@spawn peakflops() + wait(t) + @test !t.metrics_enabled + @test isnothing(Base.Experimental.task_cpu_time_ns(t)) + @test isnothing(Base.Experimental.task_wall_time_ns(t)) + end + @testset "task not run" begin + t1 = Task(() -> nothing) + @test !t1.metrics_enabled + @test isnothing(Base.Experimental.task_cpu_time_ns(t1)) + @test isnothing(Base.Experimental.task_wall_time_ns(t1)) + try + Base.Experimental.task_metrics(true) + t2 = Task(() -> nothing) + @test t2.metrics_enabled + @test Base.Experimental.task_cpu_time_ns(t2) == 0 + @test Base.Experimental.task_wall_time_ns(t2) == 0 + finally + Base.Experimental.task_metrics(false) + end + end + @testset "task failure" begin + try + Base.Experimental.task_metrics(true) + t = Threads.@spawn error("this task failed") + @test_throws "this task failed" wait(t) + @test Base.Experimental.task_cpu_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) >= Base.Experimental.task_cpu_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end + @testset "direct yield(t)" begin + try + Base.Experimental.task_metrics(true) + start = time_ns() + t_outer = Threads.@spawn begin + t_inner = Task(() -> peakflops()) + t_inner.sticky = false + # directly yield to `t_inner` rather calling `schedule(t_inner)` + yield(t_inner) + wait(t_inner) + @test Base.Experimental.task_cpu_time_ns(t_inner) > 0 + @test Base.Experimental.task_wall_time_ns(t_inner) > 0 + @test Base.Experimental.task_wall_time_ns(t_inner) >= Base.Experimental.task_cpu_time_ns(t_inner) + end + wait(t_outer) + delta = time_ns() - start + @test Base.Experimental.task_cpu_time_ns(t_outer) > 0 + @test Base.Experimental.task_wall_time_ns(t_outer) > 0 + @test Base.Experimental.task_wall_time_ns(t_outer) >= Base.Experimental.task_cpu_time_ns(t_outer) + @test Base.Experimental.task_wall_time_ns(t_outer) < delta + finally + Base.Experimental.task_metrics(false) + end + end + @testset "bad schedule" begin + try + Base.Experimental.task_metrics(true) + t1 = Task((x) -> 1) + schedule(t1) # MethodError + yield() + @assert istaskfailed(t1) + @test Base.Experimental.task_cpu_time_ns(t1) > 0 + @test Base.Experimental.task_wall_time_ns(t1) > 0 + foo(a, b) = a + b + t2 = Task(() -> (peakflops(); foo(wait()))) + schedule(t2) + yield() + @assert istaskstarted(t1) && !istaskdone(t2) + cpu1 = Base.Experimental.task_cpu_time_ns(t2) + wall1 = Base.Experimental.task_wall_time_ns(t2) + @test cpu1 > 0 + @test wall1 > 0 + schedule(t2, 1) + yield() + @assert istaskfailed(t2) + @test Base.Experimental.task_cpu_time_ns(t2) > cpu1 + @test Base.Experimental.task_wall_time_ns(t2) > wall1 + finally + Base.Experimental.task_metrics(false) + end + end + @testset "continuously update until task done" begin + try + Base.Experimental.task_metrics(true) + last_cpu_time = Ref(typemax(Int)) + last_wall_time = Ref(typemax(Int)) + t = Threads.@spawn begin + cpu_time = Base.Experimental.task_cpu_time_ns(current_task()) + wall_time = Base.Experimental.task_wall_time_ns(current_task()) + for _ in 1:5 + x = time_ns() + while time_ns() < x + 100 + end + new_cpu_time = Base.Experimental.task_cpu_time_ns(current_task()) + new_wall_time = Base.Experimental.task_wall_time_ns(current_task()) + @test new_cpu_time > cpu_time + @test new_wall_time > wall_time + cpu_time = new_cpu_time + wall_time = new_wall_time + end + last_cpu_time[] = cpu_time + last_wall_time[] = wall_time + end + wait(t) + final_cpu_time = Base.Experimental.task_cpu_time_ns(t) + final_wall_time = Base.Experimental.task_wall_time_ns(t) + @test last_cpu_time[] < final_cpu_time + @test last_wall_time[] < final_wall_time + # ensure many more tasks are run to make sure the counters are + # not being updated after a task is done e.g. only when a new task is found + @sync for _ in 1:Threads.nthreads() + Threads.@spawn rand() + end + @test final_cpu_time == Base.Experimental.task_cpu_time_ns(t) + @test final_wall_time == Base.Experimental.task_wall_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end +end + +@testset "task time counters: lots of spawns" begin + using Dates + try + Base.Experimental.task_metrics(true) + # create more tasks than we have threads. + # - all tasks must have: cpu time <= wall time + # - some tasks must have: cpu time < wall time + # - summing across all tasks we must have: total cpu time <= available cpu time + n_tasks = 2 * Threads.nthreads(:default) + cpu_times = Vector{UInt64}(undef, n_tasks) + wall_times = Vector{UInt64}(undef, n_tasks) + start_time = time_ns() + @sync begin + for i in 1:n_tasks + start_time_i = time_ns() + task_i = Threads.@spawn peakflops() + Threads.@spawn begin + wait(task_i) + end_time_i = time_ns() + wall_time_delta_i = end_time_i - start_time_i + cpu_times[$i] = cpu_time_i = Base.Experimental.task_cpu_time_ns(task_i) + wall_times[$i] = wall_time_i = Base.Experimental.task_wall_time_ns(task_i) + # task should have recorded some cpu-time and some wall-time + @test cpu_time_i > 0 + @test wall_time_i > 0 + # task cpu-time cannot be greater than its wall-time + @test wall_time_i >= cpu_time_i + # task wall-time must be less than our manually measured wall-time + # between calling `@spawn` and returning from `wait`. + @test wall_time_delta_i > wall_time_i + end + end + end + end_time = time_ns() + wall_time_delta = (end_time - start_time) + available_cpu_time = wall_time_delta * Threads.nthreads(:default) + summed_cpu_time = sum(cpu_times) + # total CPU time from all tasks can't exceed what was actually available. + @test available_cpu_time > summed_cpu_time + # some tasks must have cpu-time less than their wall-time, because we had more tasks + # than threads. + summed_wall_time = sum(wall_times) + @test summed_wall_time > summed_cpu_time + finally + Base.Experimental.task_metrics(false) + end +end + end # main testset