Skip to content

Commit

Permalink
Stop kubernetes log tailer gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Oct 16, 2024
1 parent dd16550 commit 1ceac35
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions internal/component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
Timestamps: true, // Should be forced to true so we can parse the original timestamp back out.
})

stream, err := req.Stream(ctx)
stream, err := req.Stream(context.Background())
if err != nil {
return err
}
defer stream.Close()

k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion()
if err != nil {
Expand Down Expand Up @@ -207,7 +208,6 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
Expand All @@ -222,16 +222,13 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
s := time.Since(last)
if s > avg*3 {
level.Debug(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)

cancel()
return
}
}
}
}()
} else {
go func() {
<-ctx.Done()
_ = stream.Close()
}()
}

level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)
Expand Down

0 comments on commit 1ceac35

Please sign in to comment.