diff --git a/utils/queue.go b/utils/queue.go index dec1d6a..6192fdb 100644 --- a/utils/queue.go +++ b/utils/queue.go @@ -4,18 +4,16 @@ import ( "context" "errors" "sync" - "sync/atomic" "time" ) type FDQueue[S ~[]E, E any] struct { - ctx context.Context - close context.CancelFunc - timelimit time.Duration - ch chan E - active sync.WaitGroup - batchSize int - overflowed atomic.Bool + ctx context.Context + close context.CancelFunc + timelimit time.Duration + ch chan E + active sync.WaitGroup + batchSize int } var ErrClosed = errors.New("[chotki] feed/drain queue is closed") @@ -40,7 +38,7 @@ func (q *FDQueue[S, E]) Close() error { } func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { - if q.ctx.Err() != nil || q.overflowed.Load() { + if q.ctx.Err() != nil { return ErrClosed } q.active.Add(1) @@ -51,8 +49,6 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { break case <-q.ctx.Done(): break - case <-time.After(q.timelimit): - q.overflowed.Store(true) case q.ch <- pkg: } @@ -61,7 +57,7 @@ func (q *FDQueue[S, E]) Drain(ctx context.Context, recs S) error { } func (q *FDQueue[S, E]) Feed(ctx context.Context) (recs S, err error) { - if q.ctx.Err() != nil || q.overflowed.Load() { + if q.ctx.Err() != nil { return nil, ErrClosed } q.active.Add(1)