diff --git a/writer/writer.go b/writer/writer.go index 8fb8e4dc..3a3fb019 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -244,7 +244,23 @@ func (w *Writer) worker(ctx context.Context) { // open first file rotateCheck() - ticker := time.NewTicker(100 * time.Millisecond) + tickerC := make(chan struct{}, 1) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + select { + case tickerC <- struct{}{}: + // pass + case <-ctx.Done(): + return + } + } + } + }() write := func(b *RowBinary.WriteBuffer) { _, err := outBuf.Write(b.Body[:b.Used]) @@ -270,10 +286,8 @@ func (w *Writer) worker(ctx context.Context) { select { case b := <-w.inputChan: write(b) - case <-ticker.C: - if size > 0 { - rotateCheck() - } + case <-tickerC: + rotateCheck() case <-ctx.Done(): return default: // outBuf flush if nothing received @@ -284,6 +298,15 @@ func (w *Writer) worker(ctx context.Context) { w.logger.Error("CompWriter Flush() failed", zap.Error(err)) } } + + select { + case b := <-w.inputChan: + write(b) + case <-tickerC: + rotateCheck() + case <-ctx.Done(): + return + } } } }