Skip to content

Commit

Permalink
processor: fix waitgroup panic (dapr#8307)
Browse files Browse the repository at this point in the history
panic: sync: WaitGroup is reused before previous Wait has returned

Signed-off-by: Luis Rascao <[email protected]>
  • Loading branch information
lrascao authored Nov 28, 2024
1 parent fdd642e commit 24ac3b8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
12 changes: 7 additions & 5 deletions pkg/runtime/processor/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,14 @@ func (p *Processor) AddPendingComponent(ctx context.Context, comp componentsapi.
return false
}

p.pendingComponentsWaiting.Add(1)
p.pendingComponentsWaiting.RLock()

select {
case <-ctx.Done():
p.pendingComponentsWaiting.Done()
p.pendingComponentsWaiting.RUnlock()
return false
case <-p.closedCh:
p.pendingComponentsWaiting.Done()
p.pendingComponentsWaiting.RUnlock()
return false
case p.pendingComponents <- comp:
return true
Expand All @@ -177,7 +178,7 @@ func (p *Processor) processComponents(ctx context.Context) error {

for comp := range p.pendingComponents {
err := process(comp)
p.pendingComponentsWaiting.Done()
p.pendingComponentsWaiting.RUnlock()
if err != nil {
return err
}
Expand All @@ -188,7 +189,8 @@ func (p *Processor) processComponents(ctx context.Context) error {

// WaitForEmptyComponentQueue waits for the component queue to be empty.
func (p *Processor) WaitForEmptyComponentQueue() {
p.pendingComponentsWaiting.Wait()
p.pendingComponentsWaiting.Lock()
defer p.pendingComponentsWaiting.Unlock()
}

func (p *Processor) processComponentAndDependents(ctx context.Context, comp componentsapi.Component) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type Processor struct {

pendingHTTPEndpoints chan httpendpointsapi.HTTPEndpoint
pendingComponents chan componentsapi.Component
pendingComponentsWaiting sync.WaitGroup
pendingComponentsWaiting sync.RWMutex
pendingComponentDependents map[string][]componentsapi.Component
subErrCh chan error

Expand Down
65 changes: 65 additions & 0 deletions pkg/runtime/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -654,3 +655,67 @@ func TestReporter(t *testing.T) {
}
})
}

func TestProcessorWaitGroupError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error)
t.Cleanup(func() {
cancel()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(time.Second * 5):
require.Fail(t, "timeout waiting for processor to return")
}
})
proc, _ := newTestProc()
// spin up the processor
go func() {
errCh <- proc.Process(ctx)
}()

comp1 := componentsapi.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "testpubsub1",
},
Spec: componentsapi.ComponentSpec{
Type: "pubsub.mockPubSub",
Version: "v1",
Metadata: daprt.GetFakeMetadataItems(),
InitTimeout: "2",
IgnoreErrors: true,
},
}
comp2 := componentsapi.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "testpubsub2",
},
Spec: componentsapi.ComponentSpec{
Type: "pubsub.mockPubSub",
Version: "v1",
Metadata: daprt.GetFakeMetadataItems(),
InitTimeout: "2",
IgnoreErrors: true,
},
}

var wg sync.WaitGroup
wg.Add(10_000 * 2)

for range 10_000 {
go func() {
if proc.AddPendingComponent(ctx, comp1) {
proc.WaitForEmptyComponentQueue()
wg.Done()
}
}()
go func() {
if proc.AddPendingComponent(ctx, comp2) {
proc.WaitForEmptyComponentQueue()
wg.Done()
}
}()
}

wg.Wait()
}

0 comments on commit 24ac3b8

Please sign in to comment.