Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pkg/cgo): solve data races when assigning/deleting and accessing handles #85

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions pkg/cgo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ limitations under the License.

package cgo

import (
"fmt"
"sync/atomic"
"unsafe"
)

// Handle is an alternative implementation of cgo.Handle introduced by
// Go 1.17, see https://pkg.go.dev/runtime/cgo. This implementation
// optimizes performance in use cases related to plugins. It is intended
Expand All @@ -43,12 +49,18 @@ package cgo
// The usage in other contexts is discuraged.
type Handle uintptr

// MaxHandle is the largest value that an Handle can hold
const MaxHandle = 256 - 1
const (
// MaxHandle is the largest value that an Handle can hold
MaxHandle = 256 - 1

// max number of times we're willing to iterate over the vector of reusable
// handles to do compare-and-swap before giving up
maxNewHandleRounds = 20
)

var (
handles [MaxHandle + 1]interface{} // [int]interface{}
noHandle int = 0
handles [MaxHandle + 1]unsafe.Pointer // [int]*interface{}
noHandle unsafe.Pointer = nil
)

func init() {
Expand All @@ -72,24 +84,41 @@ func init() {
//
// This function is not thread-safe.
func NewHandle(v interface{}) Handle {
for i := 1; i <= MaxHandle; i++ {
if handles[i] == &noHandle {
handles[i] = v
return Handle(i)
rounds := 0
for h := uintptr(1); ; h++ {
// we acquired ownership of an handle, return it
// note: we attempt accessing slots 1..MaxHandle (included)
if atomic.CompareAndSwapPointer(&handles[h], noHandle, (unsafe.Pointer)(&v)) {
return Handle(h)
}

// we haven't acquired a handle, but we can try with the next one
if h < MaxHandle {
continue
}

// we iterated over the whole vector of handles, so we get back to start
// and try again with another round. Once we do this too many times,
// we have no choice if not panic-ing
h = uintptr(0) // note: will be incremented when continuing
if rounds < maxNewHandleRounds {
rounds++
continue
}

panic(fmt.Sprintf("plugin-sdk-go/cgo: could not obtain a new handle after round #%d", rounds))
}
panic("plugin-sdk-go/cgo: ran out of handle space")
}

// Value returns the associated Go value for a valid handle.
//
// The method panics if the handle is invalid.
// This function is not thread-safe.
func (h Handle) Value() interface{} {
if h > MaxHandle || handles[h] == &noHandle {
panic("plugin-sdk-go/cgo: misuse of an invalid Handle")
if h > MaxHandle || atomic.LoadPointer(&handles[h]) == noHandle {
panic(fmt.Sprintf("plugin-sdk-go/cgo: misuse (value) of an invalid Handle %d", h))
}
return handles[h]
return *(*interface{})(atomic.LoadPointer(&handles[h]))
}

// Delete invalidates a handle. This method should only be called once
Expand All @@ -99,14 +128,14 @@ func (h Handle) Value() interface{} {
// The method panics if the handle is invalid.
// This function is not thread-safe.
func (h Handle) Delete() {
if h > MaxHandle || handles[h] == &noHandle {
panic("plugin-sdk-go/cgo: misuse of an invalid Handle")
if h > MaxHandle || atomic.LoadPointer(&handles[h]) == noHandle {
panic(fmt.Sprintf("plugin-sdk-go/cgo: misuse (delete) of an invalid Handle %d", h))
}
handles[h] = &noHandle
atomic.StorePointer(&handles[h], noHandle)
}

func resetHandles() {
for i := 0; i <= MaxHandle; i++ {
handles[i] = &noHandle
atomic.StorePointer(&handles[i], noHandle)
}
}
3 changes: 2 additions & 1 deletion pkg/cgo/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cgo

import (
"reflect"
"sync/atomic"
"testing"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ func TestHandle(t *testing.T) {

siz := 0
for i := 0; i < MaxHandle; i++ {
if handles[i] != &noHandle {
if atomic.LoadPointer(&handles[i]) != noHandle {
siz++
}
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/sdk/symbols/extract/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ const (
// asyncBatchSize is the physical size of batches allocated
// in C memory, namely the total number of locks available
asyncBatchSize = cgo.MaxHandle + 1
//
// max number of seconds we're willing to wait for a worker to exit
// once released before triggering a panic
workerReleaseTimeoutInSeconds = 10
)

var (
Expand Down Expand Up @@ -186,7 +190,9 @@ func (a *asyncContext) acquireWorker(workerIdx int32) {
for _, i := range batchIdxs {
// reduce sync overhead by skipping unused batch slots
if i > a.maxBatchIdx {
continue
// from this point on we'll only encountered unused slots
// so we mind as well just start over
break
}

// check for incoming request, if any, otherwise busy waits
Expand Down Expand Up @@ -244,13 +250,23 @@ func (a *asyncContext) releaseWorker(workerIdx int32) {
// side, we use the first visible slot and set an exit request. The worker
// will eventually synchronize with the used lock and stop.
idx := a.workerIdxToBatchIdxs(workerIdx)[0]
waitStartTime := time.Now()
for !atomic.CompareAndSwapInt32((*int32)(&a.batch[idx].lock), state_unused, state_exit_req) {
// spin
// spinning, but let's yield first
runtime.Gosched()
if time.Since(waitStartTime).Seconds() > workerReleaseTimeoutInSeconds {
panic("plugin-sdk-go/sdk/symbols/extract: async worker release timeout expired (1)")
}
}

// wait for worker exiting
waitStartTime = time.Now()
for atomic.LoadInt32((*int32)(&a.batch[idx].lock)) != state_exit_ack {
// spin
// spinning, but let's yield first
runtime.Gosched()
if time.Since(waitStartTime).Seconds() > workerReleaseTimeoutInSeconds {
panic("plugin-sdk-go/sdk/symbols/extract: async worker release timeout expired (2)")
}
}

// restore first worker slot
Expand Down
Loading