Skip to content

Commit

Permalink
when que overflowed: wait a bit and stop accepting, return errors on …
Browse files Browse the repository at this point in the history
…feed
  • Loading branch information
Termina1 committed Dec 16, 2024
1 parent 7a66df1 commit 364b3d7
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions utils/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
)

type FDQueue[S ~[]E, E any] struct {
ctx context.Context
close context.CancelFunc
timelimit time.Duration
ch chan E
active sync.WaitGroup
batchSize int
ctx context.Context
close context.CancelFunc
timelimit time.Duration
ch chan E
active sync.WaitGroup
batchSize int
overflowed atomic.Bool
}

var ErrClosed = errors.New("[chotki] feed/drain queue is closed")
Expand All @@ -38,7 +40,7 @@ func (q *FDQueue[S, E]) Close() error {
}

func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
if q.ctx.Err() != nil {
if q.ctx.Err() != nil || q.overflowed.Load() {
return ErrClosed
}
q.active.Add(1)
Expand All @@ -49,6 +51,8 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
break
case <-q.ctx.Done():
break
case <-time.After(q.timelimit):
q.overflowed.Store(true)
case q.ch <- pkg:
}

Expand All @@ -57,7 +61,7 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error {
}

func (q *FDQueue[S, E]) Feed(ctx context.Context) (recs S, err error) {
if q.ctx.Err() != nil {
if q.ctx.Err() != nil || q.overflowed.Load() {
return nil, ErrClosed
}
q.active.Add(1)
Expand Down

0 comments on commit 364b3d7

Please sign in to comment.