From faf7a12fecccbc4a4c7107f3102bd85344293541 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Tue, 17 Dec 2024 19:19:26 +0200 Subject: [PATCH] Revert "when que overflowed: wait a bit and stop accepting, return errors on feed" This reverts commit 364b3d76cabc68ffe264980a4c86bc43c50e9165. --- utils/queue.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/utils/queue.go b/utils/queue.go index dec1d6a..6192fdb 100644 --- a/utils/queue.go +++ b/utils/queue.go @@ -4,18 +4,16 @@ import ( "context" "errors" "sync" - "sync/atomic" "time" ) type FDQueue[S ~[]E, E any] struct { - ctx context.Context - close context.CancelFunc - timelimit time.Duration - ch chan E - active sync.WaitGroup - batchSize int - overflowed atomic.Bool + ctx context.Context + close context.CancelFunc + timelimit time.Duration + ch chan E + active sync.WaitGroup + batchSize int } var ErrClosed = errors.New("[chotki] feed/drain queue is closed") @@ -40,7 +38,7 @@ func (q *FDQueue[S, E]) Close() error { } func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { - if q.ctx.Err() != nil || q.overflowed.Load() { + if q.ctx.Err() != nil { return ErrClosed } q.active.Add(1) @@ -51,8 +49,6 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { break case <-q.ctx.Done(): break - case <-time.After(q.timelimit): - q.overflowed.Store(true) case q.ch <- pkg: } @@ -61,7 +57,7 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { } func (q *FDQueue[S, E]) Feed(ctx context.Context) (recs S, err error) { - if q.ctx.Err() != nil || q.overflowed.Load() { + if q.ctx.Err() != nil { return nil, ErrClosed } q.active.Add(1)