From 1ceac3542b83d3d925143bd87964195b2f9cdd3c Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Fri, 20 Sep 2024 17:09:19 +0100 Subject: [PATCH] Stop kubernetes log tailer gracefully --- .../loki/source/kubernetes/kubetail/tailer.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/component/loki/source/kubernetes/kubetail/tailer.go b/internal/component/loki/source/kubernetes/kubetail/tailer.go index 1b647598d0..462ce8b61c 100644 --- a/internal/component/loki/source/kubernetes/kubetail/tailer.go +++ b/internal/component/loki/source/kubernetes/kubetail/tailer.go @@ -167,10 +167,11 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { Timestamps: true, // Should be forced to true so we can parse the original timestamp back out. }) - stream, err := req.Stream(ctx) + stream, err := req.Stream(context.Background()) if err != nil { return err } + defer stream.Close() k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion() if err != nil { @@ -207,7 +208,6 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { rolledFileTicker := time.NewTicker(1 * time.Second) defer func() { rolledFileTicker.Stop() - _ = stream.Close() }() for { select { @@ -222,16 +222,13 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { s := time.Since(last) if s > avg*3 { level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s) + + cancel() return } } } }() - } else { - go func() { - <-ctx.Done() - _ = stream.Close() - }() } level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)