Skip to content

Commit

Permalink
backport container restart support in loki source
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed May 10, 2024
1 parent 2d14260 commit 985fc79
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Main (unreleased)
- Fix an issue on Windows where uninstalling Alloy did not remove it from the
Add/Remove programs list. (@rfratto)

- Fix an issue where `loki.source.docker` stops collecting logs after a container restart. (@wildum)

### Other changes

- Clustering for Grafana Agent in Flow mode has graduated from beta to stable.
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/source/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) {
}

return &options{
client: client,
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
positions: c.posFile,
client: client,
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
positions: c.posFile,
targetRestartInterval: 5 * time.Second,
}, nil
}

Expand Down
105 changes: 105 additions & 0 deletions internal/component/loki/source/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,32 @@ package docker

import (
"context"
"io"
"os"
"strings"
"testing"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/grafana/agent/internal/component"
"github.com/grafana/agent/internal/component/common/loki/client/fake"
"github.com/grafana/agent/internal/component/common/loki/positions"
dt "github.com/grafana/agent/internal/component/loki/source/docker/internal/dockertarget"
"github.com/grafana/agent/internal/flow/componenttest"
"github.com/grafana/agent/internal/util"
"github.com/grafana/river"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const targetRestartInterval = 20 * time.Millisecond

func Test(t *testing.T) {
// Use host that works on all platforms (including Windows).
var cfg = `
Expand Down Expand Up @@ -73,3 +88,93 @@ func TestDuplicateTargets(t *testing.T) {

require.Len(t, cmp.manager.tasks, 1)
}

func TestRestart(t *testing.T) {
runningState := true
client := clientMock{
logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor",
running: func() bool { return runningState },
}
expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor"

tailer, entryHandler := setupTailer(t, client)
go tailer.Run(context.Background())

// The container is already running, expect log lines.
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logLines := entryHandler.Received()
if assert.NotEmpty(c, logLines) {
assert.Equal(c, expectedLogLine, logLines[0].Line)
}
}, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.")

// Stops the container.
runningState = false
time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines.
entryHandler.Clear()
time.Sleep(targetRestartInterval + 10*time.Millisecond)
assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running.

// Restart the container and expect log lines.
runningState = true
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logLines := entryHandler.Received()
if assert.NotEmpty(c, logLines) {
assert.Equal(c, expectedLogLine, logLines[0].Line)
}
}, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.")
}

func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *fake.Client) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
entryHandler = fake.NewClient(func() {})

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: t.TempDir() + "/positions.yml",
})
require.NoError(t, err)

tgt, err := dt.NewTarget(
dt.NewMetrics(prometheus.NewRegistry()),
logger,
entryHandler,
ps,
"flog",
model.LabelSet{"job": "docker"},
[]*relabel.Config{},
client,
)
require.NoError(t, err)
tailerTask := &tailerTask{
options: &options{
client: client,
targetRestartInterval: targetRestartInterval,
},
target: tgt,
}
return newTailer(logger, tailerTask), entryHandler
}

type clientMock struct {
client.APIClient
logLine string
running func() bool
}

func (mock clientMock) ContainerInspect(ctx context.Context, c string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: c,
State: &types.ContainerState{
Running: mock.running(),
},
},
Config: &container.Config{Tty: true},
}, nil
}

func (mock clientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(mock.logLine)), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (t *Target) StartIfNotRunning() {
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
go t.processLoop(ctx)
} else {
level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName)
}
}

Expand Down
41 changes: 23 additions & 18 deletions internal/component/loki/source/docker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package docker
import (
"context"
"sync"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/grafana/agent/internal/component/common/loki"
Expand Down Expand Up @@ -52,6 +52,9 @@ type options struct {

// positions interface so tailers can save/restore offsets in log files.
positions positions.Positions

// targetRestartInterval to restart task that has stopped running.
targetRestartInterval time.Duration
}

// tailerTask is the payload used to create tailers. It implements runner.Task.
Expand Down Expand Up @@ -95,23 +98,25 @@ func newTailer(l log.Logger, task *tailerTask) *tailer {
}

func (t *tailer) Run(ctx context.Context) {
ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)

t.target.StartIfNotRunning()

select {
case err := <-chErr:
// Error setting up the Wait request from the client; either failed to
// read from /containers/{containerID}/wait, or couldn't parse the
// response. Stop the target and exit the task after logging; if it was
// a transient error, the target will be retried on the next discovery
// refresh.
level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)
t.target.Stop()
return
case <-ch:
t.target.Stop()
return
ticker := time.NewTicker(t.opts.targetRestartInterval)
tickerC := ticker.C

for {
select {
case <-tickerC:
res, err := t.opts.client.ContainerInspect(ctx, t.target.Name())
if err != nil {
level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err)
continue
}
if res.State.Running {
t.target.StartIfNotRunning()
}
case <-ctx.Done():
t.target.Stop()
ticker.Stop()
return
}
}
}

Expand Down

0 comments on commit 985fc79

Please sign in to comment.