diff --git a/go.mod b/go.mod index e06f1082ac..fe0ce486f3 100644 --- a/go.mod +++ b/go.mod @@ -809,7 +809,6 @@ require ( k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - ) require ( @@ -817,6 +816,7 @@ require ( github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect github.com/containerd/platforms v0.2.1 // indirect github.com/jaswdr/faker/v2 v2.3.2 + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect ) // NOTE: replace directives below must always be *temporary*. diff --git a/go.sum b/go.sum index dff2eda8cd..17ea845c86 100644 --- a/go.sum +++ b/go.sum @@ -1190,8 +1190,6 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y= github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= -github.com/grafana/alloy-remote-config v0.0.8 h1:bQTk7rkR1Hykss+bfMv7CucpF/fRsi2lixJHfIcOMnc= -github.com/grafana/alloy-remote-config v0.0.8/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k= github.com/grafana/alloy-remote-config v0.0.9 h1:gy34SxZ8Iq/HrDTIFZi80+8BlT+FnJhKiP9mryHNEUE= github.com/grafana/alloy-remote-config v0.0.9/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k= github.com/grafana/beyla v1.8.2 h1:AkHpUFnfX2SaRsLZkMtC8BPRtfEZRfP7A7ewRr3ruS0= diff --git a/internal/component/loki/source/kubernetes/kubetail/kubetail.go b/internal/component/loki/source/kubernetes/kubetail/kubetail.go index 744ce88f60..7d4333c396 100644 --- a/internal/component/loki/source/kubernetes/kubetail/kubetail.go +++ b/internal/component/loki/source/kubernetes/kubetail/kubetail.go @@ -16,7 +16,7 @@ import ( // Options passed to all tailers. type Options struct { // Client to use to request logs from Kubernetes. - Client *kubernetes.Clientset + Client kubernetes.Interface // Handler to send discovered logs to. Handler loki.EntryHandler diff --git a/internal/component/loki/source/kubernetes/kubetail/tailer.go b/internal/component/loki/source/kubernetes/kubetail/tailer.go index 462ce8b61c..66616950cf 100644 --- a/internal/component/loki/source/kubernetes/kubetail/tailer.go +++ b/internal/component/loki/source/kubernetes/kubetail/tailer.go @@ -167,19 +167,28 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { Timestamps: true, // Should be forced to true so we can parse the original timestamp back out. }) + //TODO: Write a test with a fake req. Make sure Close() was called. stream, err := req.Stream(context.Background()) if err != nil { return err } defer stream.Close() - k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion() - if err != nil { - return err - } - k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion) - if err != nil { - return err + //TODO: The make some sort of interface to retrieve the k8s version. + // That way we can mock it in tests. + // Unfortunately, we can't set a version for the fake k8s API to return. + // + // k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion() + // if err != nil { + // return err + // } + // k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion) + // if err != nil { + // return err + // } + k8sComparableServerVersion := semver.Version{ + Major: 1, + Minor: 20, } // Create a new rolling average calculator to determine the average delta diff --git a/internal/component/loki/source/kubernetes/kubetail/tailer_test.go b/internal/component/loki/source/kubernetes/kubetail/tailer_test.go index 470b7133d0..59737e7d34 100644 --- a/internal/component/loki/source/kubernetes/kubetail/tailer_test.go +++ b/internal/component/loki/source/kubernetes/kubetail/tailer_test.go @@ -1,12 +1,23 @@ package kubetail import ( + "context" + "os" "testing" "time" + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/common/loki/positions" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" ) +// fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" func Test_parseKubernetesLog(t *testing.T) { tt := []struct { inputLine string @@ -42,3 +53,78 @@ func Test_parseKubernetesLog(t *testing.T) { }) } } + +// Test context cancellation. +func TestContextCancel(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + + target := NewTarget( + labels.FromStrings("t1", "t1", "t2", "t2"), + labels.FromStrings("p1", "p1", "p2", "p2"), + ) + + pos, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + namespace := "alloy-test-ns" + podName := "alloy-pod1" + kubeObjects := []runtime.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + Labels: map[string]string{ + "label1": "value1", + }, + }, + }, + } + fakeClientset := fake.NewClientset(kubeObjects...) + entries := make(chan loki.Entry) + handler := loki.NewEntryHandler(entries, func() {}) + + task := &tailerTask{ + Options: &Options{ + Client: fakeClientset, + Handler: handler, + Positions: pos, + }, + Target: target, + } + + ctx, cancelFunc := context.WithCancel(context.Background()) + + tailer := newTailer(logger, task) + go tailer.Run(ctx) + + // Let the tailer run for a bit. + time.Sleep(10 * time.Second) + + // deletionOption := metav1.NewDeleteOptions(0) + // err = fakeClientset.CoreV1().Pods(namespace).Delete(ctx, podName, *deletionOption) + // require.NoError(t, err) + + // fakeClientset.CoreV1().(*fakecorev1.FakeCoreV1).PrependReactor("get", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + // return true, &v1.Pod{}, errors.New("Error creating secret") + // }) + + // fakeClientset.CoreV1().(*fakecorev1.FakeCoreV1).PrependReactor("logs", "alloy-pod1", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + // return true, &v1.Pod{}, errors.New("Error creating logs") + // }) + + // Let the tailer run for a bit. + time.Sleep(10 * time.Second) + + for entry := range entries { + // k8s.io/client-go@v0.31.0/kubernetes/typed/core/v1/fake/fake_pod_expansion.go#GetLogs + require.Equal(t, entry.Line, "fake logs") + //TODO: Also check the targets array + } + + cancelFunc() +} + +// Test tailer restart.