diff --git a/src/internal/task/atomic-cooperative.go b/src/internal/task/atomic-cooperative.go new file mode 100644 index 0000000000..60eb917a8e --- /dev/null +++ b/src/internal/task/atomic-cooperative.go @@ -0,0 +1,41 @@ +package task + +// Atomics implementation for cooperative systems. The atomic types here aren't +// actually atomic, they assume that accesses cannot be interrupted by a +// different goroutine or interrupt happening at the same time. + +type atomicIntegerType interface { + uintptr | uint32 | uint64 +} + +type pseudoAtomic[T atomicIntegerType] struct { + v T +} + +func (x *pseudoAtomic[T]) Add(delta T) T { x.v += delta; return x.v } +func (x *pseudoAtomic[T]) Load() T { return x.v } +func (x *pseudoAtomic[T]) Store(val T) { x.v = val } +func (x *pseudoAtomic[T]) CompareAndSwap(old, new T) (swapped bool) { + if x.v != old { + return false + } + x.v = new + return true +} +func (x *pseudoAtomic[T]) Swap(new T) (old T) { + old = x.v + x.v = new + return +} + +// Uintptr is an atomic uintptr when multithreading is enabled, and a plain old +// uintptr otherwise. +type Uintptr = pseudoAtomic[uintptr] + +// Uint32 is an atomic uint32 when multithreading is enabled, and a plain old +// uint32 otherwise. +type Uint32 = pseudoAtomic[uint32] + +// Uint64 is an atomic uint64 when multithreading is enabled, and a plain old +// uint64 otherwise. +type Uint64 = pseudoAtomic[uint64] diff --git a/src/internal/task/futex-cooperative.go b/src/internal/task/futex-cooperative.go new file mode 100644 index 0000000000..8351f88774 --- /dev/null +++ b/src/internal/task/futex-cooperative.go @@ -0,0 +1,44 @@ +package task + +// A futex is a way for userspace to wait with the pointer as the key, and for +// another thread to wake one or all waiting threads keyed on the same pointer. +// +// A futex does not change the underlying value, it only reads it before to prevent +// lost wake-ups. +type Futex struct { + Uint32 + waiters Stack +} + +// Atomically check for cmp to still be equal to the futex value and if so, go +// to sleep. Return true if we were definitely awoken by a call to Wake or +// WakeAll, and false if we can't be sure of that. +func (f *Futex) Wait(cmp uint32) (awoken bool) { + if f.Uint32.v != cmp { + return false + } + + // Push the current goroutine onto the waiter stack. + f.waiters.Push(Current()) + + // Pause until the waiters are awoken by Wake/WakeAll. + Pause() + + // We were awoken by a call to Wake or WakeAll. There is no chance for + // spurious wakeups. + return true +} + +// Wake a single waiter. +func (f *Futex) Wake() { + if t := f.waiters.Pop(); t != nil { + scheduleTask(t) + } +} + +// Wake all waiters. +func (f *Futex) WakeAll() { + for t := f.waiters.Pop(); t != nil; t = f.waiters.Pop() { + scheduleTask(t) + } +} diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go index 72ef24c809..c73da3ecd4 100644 --- a/src/sync/waitgroup.go +++ b/src/sync/waitgroup.go @@ -3,35 +3,62 @@ package sync import "internal/task" type WaitGroup struct { - counter uint - waiters task.Stack + futex task.Futex } func (wg *WaitGroup) Add(delta int) { if delta > 0 { // Check for overflow. - if uint(delta) > (^uint(0))-wg.counter { - panic("sync: WaitGroup counter overflowed") - } + for { + counter := wg.futex.Load() + if uint32(delta) > (^uint32(0))-counter { + panic("sync: WaitGroup counter overflowed") + } - // Add to the counter. - wg.counter += uint(delta) - } else { - // Check for underflow. - if uint(-delta) > wg.counter { - panic("sync: negative WaitGroup counter") + // Add to the counter. + if wg.futex.CompareAndSwap(counter, counter+uint32(delta)) { + // Successfully added. + return + } } + } else { + for { + counter := wg.futex.Load() + + // Check for underflow. + if uint32(-delta) > counter { + panic("sync: negative WaitGroup counter") + } - // Subtract from the counter. - wg.counter -= uint(-delta) + // Subtract from the counter. + if !wg.futex.CompareAndSwap(counter, counter-uint32(-delta)) { + // Could not swap, trying again. + continue + } - // If the counter is zero, everything is done and the waiters should be resumed. - // This code assumes that the waiters cannot wake up until after this function returns. - // In the current implementation, this is always correct. - if wg.counter == 0 { - for t := wg.waiters.Pop(); t != nil; t = wg.waiters.Pop() { - scheduleTask(t) + // If the counter is zero, everything is done and the waiters should + // be resumed. + // When there are multiple thread, there is a chance for the counter + // to go to zero, WakeAll to be called, and then the counter to be + // incremented again before a waiting goroutine has a chance to + // check the new (zero) value. However the last increment is + // explicitly given in the docs as something that should not be + // done: + // + // > Note that calls with a positive delta that occur when the + // > counter is zero must happen before a Wait. + // + // So we're fine here. + if counter-uint32(-delta) == 0 { + // TODO: this is not the most efficient implementation possible + // because we wake up all waiters unconditionally, even if there + // might be none. Though since the common usage is for this to + // be called with at least one waiter, it's probably fine. + wg.futex.WakeAll() } + + // Successfully swapped (and woken all waiting tasks if needed). + return } } } @@ -41,14 +68,15 @@ func (wg *WaitGroup) Done() { } func (wg *WaitGroup) Wait() { - if wg.counter == 0 { - // Everything already finished. - return - } - - // Push the current goroutine onto the waiter stack. - wg.waiters.Push(task.Current()) + for { + counter := wg.futex.Load() + if counter == 0 { + return // everything already finished + } - // Pause until the waiters are awoken by Add/Done. - task.Pause() + if wg.futex.Wait(counter) { + // Successfully woken by WakeAll (in wg.Add). + break + } + } }