Skip to content

Commit

Permalink
better testing
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Nov 26, 2024
1 parent f08bd7c commit e6d7036
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
7 changes: 5 additions & 2 deletions internal/worker/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/marusama/semaphore/v2"
)
Expand Down Expand Up @@ -56,14 +57,16 @@ func NewPermit(initCount int) Permit {
}

// Acquire is blocking until a permit is acquired or returns error after context is done
// Remember to call Release(count) to release the permit after usage
func (p *permit) Acquire(ctx context.Context, count int) error {
if err := p.sem.Acquire(ctx, count); err != nil {
return fmt.Errorf("failed to acquire permit before context is done: %w", err)
}
return nil
}

// AcquireChan returns a permit ready channel. It's closed then permit is acquired
// AcquireChan returns a permit ready channel. Similar to Acquire, but non-blocking.
// Remember to call Release(1) to release the permit after usage
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
ch := make(chan struct{})
wg.Add(1)
Expand All @@ -74,7 +77,7 @@ func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan str
}
select { // try to send to channel, but don't block if listener is gone
case ch <- struct{}{}:
default:
case <-time.After(10 * time.Millisecond): // wait time is needed to avoid race condition of channel sending
p.sem.Release(1)

Check warning on line 81 in internal/worker/concurrency.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/concurrency.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}
}()
Expand Down
75 changes: 42 additions & 33 deletions internal/worker/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,50 +36,58 @@ func TestPermit_Simulation(t *testing.T) {
tests := []struct {
name string
capacity []int // update every 50ms
goroutines int // each would block on acquiring 2-6 tokens for 100ms
goroutinesAcquireChan int // each would block using AcquireChan for 100ms
goroutines int // each would block on acquiring 1-2 tokens for 100-150ms
goroutinesAcquireChan int // each would block using AcquireChan for 100-150ms
maxTestDuration time.Duration
expectFailures int
expectFailuresAtLeast int
expectFailuresRange []int // range of failures, inclusive [min, max]
}{
{
name: "enough permit, no blocking",
maxTestDuration: 100 * time.Millisecond,
capacity: []int{1000},
maxTestDuration: 200 * time.Millisecond, // at most need 150 ms, add 50 ms buffer
capacity: []int{10000},
goroutines: 100,
goroutinesAcquireChan: 100,
expectFailures: 0,
expectFailuresRange: []int{0, 0},
},
{
name: "not enough permit, blocking but all acquire",
maxTestDuration: 1 * time.Second,
maxTestDuration: 1200 * time.Millisecond, // at most need 150ms * (1000 + 500) / 200 = 1125ms to acquire all permit
capacity: []int{200},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailures: 0,
goroutines: 500, // at most 1000 tokens
goroutinesAcquireChan: 500, // 500 tokens
expectFailuresRange: []int{0, 0},
},
{
name: "not enough permit for some to acquire, fail some",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{100},
maxTestDuration: 250 * time.Millisecond, // at least need 100ms * (500 + 500) / 200 = 250ms to acquire all permit
capacity: []int{200},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailuresAtLeast: 1,
expectFailuresRange: []int{1, 999}, // should at least pass some acquires
},
{
name: "not enough permit at beginning but due to capacity change, blocking but all acquire",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{100, 300, 500},
maxTestDuration: 250 * time.Millisecond,
capacity: []int{200, 400, 600},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailuresRange: []int{0, 0},
},
{
name: "enough permit at beginning but due to capacity change, some would fail",
maxTestDuration: 250 * time.Millisecond,
capacity: []int{600, 400, 200},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailures: 0,
expectFailuresRange: []int{1, 999},
},
{
name: "not enough permit for any acquire, fail all",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{0},
goroutines: 1000,
expectFailures: 1000,
name: "not enough permit for any acquire, fail all",
maxTestDuration: 300 * time.Millisecond,
capacity: []int{0},
goroutines: 500,
goroutinesAcquireChan: 500,
expectFailuresRange: []int{1000, 1000},
},
}

Expand All @@ -102,13 +110,12 @@ func TestPermit_Simulation(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
num := rand.Intn(2) + 2
// num := 1
num := rand.Intn(2) + 1
if err := permit.Acquire(ctx, num); err != nil {
failures.Add(1)
failures.Inc()
return
}
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
time.Sleep(time.Duration(100+rand.Intn(50)) * time.Millisecond)
permit.Release(num)
}()
}
Expand All @@ -118,22 +125,24 @@ func TestPermit_Simulation(t *testing.T) {
defer wg.Done()
select {
case <-permit.AcquireChan(ctx, wg):
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
time.Sleep(time.Duration(100+rand.Intn(50)) * time.Millisecond)
permit.Release(1)
case <-ctx.Done():
failures.Add(1)
failures.Inc()
}
}()
}

wg.Wait()
// sanity check
assert.Equal(t, 0, permit.Count())
if tt.expectFailuresAtLeast > 0 {
assert.LessOrEqual(t, tt.expectFailuresAtLeast, int(failures.Load()))
} else {
assert.Equal(t, tt.expectFailures, int(failures.Load()))
}
assert.Equal(t, tt.capacity[len(tt.capacity)-1], permit.Quota())

// expect failures in range
expectFailureMin := tt.expectFailuresRange[0]
expectFailureMax := tt.expectFailuresRange[1]
assert.GreaterOrEqual(t, int(failures.Load()), expectFailureMin)
assert.LessOrEqual(t, int(failures.Load()), expectFailureMax)
})
}
}

0 comments on commit e6d7036

Please sign in to comment.