diff --git a/pkg/runtime/processor/components.go b/pkg/runtime/processor/components.go index dc10543bfa2..0aa4efd53a2 100644 --- a/pkg/runtime/processor/components.go +++ b/pkg/runtime/processor/components.go @@ -144,13 +144,14 @@ func (p *Processor) AddPendingComponent(ctx context.Context, comp componentsapi. return false } - p.pendingComponentsWaiting.Add(1) + p.pendingComponentsWaiting.RLock() + select { case <-ctx.Done(): - p.pendingComponentsWaiting.Done() + p.pendingComponentsWaiting.RUnlock() return false case <-p.closedCh: - p.pendingComponentsWaiting.Done() + p.pendingComponentsWaiting.RUnlock() return false case p.pendingComponents <- comp: return true @@ -177,7 +178,7 @@ func (p *Processor) processComponents(ctx context.Context) error { for comp := range p.pendingComponents { err := process(comp) - p.pendingComponentsWaiting.Done() + p.pendingComponentsWaiting.RUnlock() if err != nil { return err } @@ -188,7 +189,8 @@ func (p *Processor) processComponents(ctx context.Context) error { // WaitForEmptyComponentQueue waits for the component queue to be empty. func (p *Processor) WaitForEmptyComponentQueue() { - p.pendingComponentsWaiting.Wait() + p.pendingComponentsWaiting.Lock() + defer p.pendingComponentsWaiting.Unlock() } func (p *Processor) processComponentAndDependents(ctx context.Context, comp componentsapi.Component) error { diff --git a/pkg/runtime/processor/processor.go b/pkg/runtime/processor/processor.go index 6680cea6ac8..d20a50225fa 100644 --- a/pkg/runtime/processor/processor.go +++ b/pkg/runtime/processor/processor.go @@ -125,7 +125,7 @@ type Processor struct { pendingHTTPEndpoints chan httpendpointsapi.HTTPEndpoint pendingComponents chan componentsapi.Component - pendingComponentsWaiting sync.WaitGroup + pendingComponentsWaiting sync.RWMutex pendingComponentDependents map[string][]componentsapi.Component subErrCh chan error diff --git a/pkg/runtime/processor/processor_test.go b/pkg/runtime/processor/processor_test.go index 7f360a1b74d..6d52516ea35 100644 --- a/pkg/runtime/processor/processor_test.go +++ b/pkg/runtime/processor/processor_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "strings" + "sync" "testing" "time" @@ -654,3 +655,67 @@ func TestReporter(t *testing.T) { } }) } + +func TestProcessorWaitGroupError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error) + t.Cleanup(func() { + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second * 5): + require.Fail(t, "timeout waiting for processor to return") + } + }) + proc, _ := newTestProc() + // spin up the processor + go func() { + errCh <- proc.Process(ctx) + }() + + comp1 := componentsapi.Component{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpubsub1", + }, + Spec: componentsapi.ComponentSpec{ + Type: "pubsub.mockPubSub", + Version: "v1", + Metadata: daprt.GetFakeMetadataItems(), + InitTimeout: "2", + IgnoreErrors: true, + }, + } + comp2 := componentsapi.Component{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpubsub2", + }, + Spec: componentsapi.ComponentSpec{ + Type: "pubsub.mockPubSub", + Version: "v1", + Metadata: daprt.GetFakeMetadataItems(), + InitTimeout: "2", + IgnoreErrors: true, + }, + } + + var wg sync.WaitGroup + wg.Add(10_000 * 2) + + for range 10_000 { + go func() { + if proc.AddPendingComponent(ctx, comp1) { + proc.WaitForEmptyComponentQueue() + wg.Done() + } + }() + go func() { + if proc.AddPendingComponent(ctx, comp2) { + proc.WaitForEmptyComponentQueue() + wg.Done() + } + }() + } + + wg.Wait() +}