Skip to content

Commit

Permalink
Fix @spawn_or_run_task with interactive threads (#3385)
Browse files Browse the repository at this point in the history
When Julia ≥ 1.9 is started with `-tX,Y`, by default tasks use the pool
of interactive threads instead of the default pool. Fix this by updating
`spawn_or_run_task` and `spawn_or_run` to match the code used
by `@spawn` in Julia master.
  • Loading branch information
nalimilan authored Oct 10, 2023
1 parent b51a973 commit 87c2162
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 4
JULIA_NUM_THREADS: 4,1
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
7 changes: 7 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
keyword argument
([#3380](https://github.com/JuliaData/DataFrames.jl/pull/3380))

## Bug fixes

* Always use the default thread pool for multithreaded operations,
instead of using the interactive thread pool when Julia was started
with `-tM,N` with N > 0
([#3385](https://github.com/JuliaData/DataFrames.jl/pull/3385))

# DataFrames.jl v1.6.1 Release Notes

## Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion docs/src/lib/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ CurrentModule = DataFrames
## Multithreading support

By default, selected operations in DataFrames.jl automatically use multiple threads
when available. It is task-based and implemented using the `@spawn` macro from Julia Base.
when available. Multi-threading is task-based and implemented using the `@spawn`
macro from Julia Base. Tasks are therefore scheduled on the `:default` threadpool.
Functions that take user-defined functions and may run it in parallel
accept a `threads` keyword argument which allows disabling multithreading
when the provided function requires serial execution or is not thread-safe.
Expand Down
26 changes: 20 additions & 6 deletions src/other/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,23 @@ end
Equivalent to `Threads.@spawn` if `threads === true`,
otherwise run `expr` and return a `Task` that returns its value.
"""
macro spawn_or_run_task(threads, expr)
letargs = Base._lift_one_interp!(expr)
macro spawn_or_run_task(threads, ex)
letargs = Base._lift_one_interp!(ex)

thunk = esc(:(()->($expr)))
thunk = :(()->($(esc(ex))))
@static if VERSION >= v"1.10.0-DEV"
Base.replace_linenums!(thunk, __source__)
end
var = esc(Base.sync_varname)
spawn_set_thrpool = VERSION >= v"1.9.0" ?
:(Base.Threads._spawn_set_thrpool(task, :default)) :
:()
quote
let $(letargs...)
if $(esc(threads))
local task = Task($thunk)
task.sticky = false
$(spawn_set_thrpool)
else
# Run expr immediately
res = $thunk()
Expand All @@ -253,16 +260,23 @@ end
Equivalent to `Threads.@spawn` if `threads === true`,
otherwise run `expr`.
"""
macro spawn_or_run(threads, expr)
letargs = Base._lift_one_interp!(expr)
macro spawn_or_run(threads, ex)
letargs = Base._lift_one_interp!(ex)

thunk = esc(:(()->($expr)))
thunk = :(()->($(esc(ex))))
if VERSION >= v"1.10.0-DEV"
Base.replace_linenums!(thunk, __source__)
end
var = esc(Base.sync_varname)
spawn_set_thrpool = VERSION >= v"1.9.0" ?
:(Base.Threads._spawn_set_thrpool(task, :default)) :
:()
quote
let $(letargs...)
if $(esc(threads))
local task = Task($thunk)
task.sticky = false
$(spawn_set_thrpool)
if $(Expr(:islocal, var))
put!($var, task)
end
Expand Down

0 comments on commit 87c2162

Please sign in to comment.