Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Oct 16, 2024
1 parent 1ceac35 commit 3b08cde
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -809,14 +809,14 @@ require (
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect

)

require (
github.com/aws/aws-sdk-go-v2/service/iam v1.33.1 // indirect
github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/jaswdr/faker/v2 v2.3.2
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
)

// NOTE: replace directives below must always be *temporary*.
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1190,8 +1190,6 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y=
github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/alloy-remote-config v0.0.8 h1:bQTk7rkR1Hykss+bfMv7CucpF/fRsi2lixJHfIcOMnc=
github.com/grafana/alloy-remote-config v0.0.8/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k=
github.com/grafana/alloy-remote-config v0.0.9 h1:gy34SxZ8Iq/HrDTIFZi80+8BlT+FnJhKiP9mryHNEUE=
github.com/grafana/alloy-remote-config v0.0.9/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k=
github.com/grafana/beyla v1.8.2 h1:AkHpUFnfX2SaRsLZkMtC8BPRtfEZRfP7A7ewRr3ruS0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// Options passed to all tailers.
type Options struct {
// Client to use to request logs from Kubernetes.
Client *kubernetes.Clientset
Client kubernetes.Interface

// Handler to send discovered logs to.
Handler loki.EntryHandler
Expand Down
23 changes: 16 additions & 7 deletions internal/component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,28 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
Timestamps: true, // Should be forced to true so we can parse the original timestamp back out.
})

//TODO: Write a test with a fake req. Make sure Close() was called.
stream, err := req.Stream(context.Background())
if err != nil {
return err
}
defer stream.Close()

k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion()
if err != nil {
return err
}
k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion)
if err != nil {
return err
//TODO: The make some sort of interface to retrieve the k8s version.
// That way we can mock it in tests.
// Unfortunately, we can't set a version for the fake k8s API to return.
//
// k8sServerVersion, err := t.opts.Client.Discovery().ServerVersion()
// if err != nil {
// return err
// }
// k8sComparableServerVersion, err := semver.ParseTolerant(k8sServerVersion.GitVersion)
// if err != nil {
// return err
// }
k8sComparableServerVersion := semver.Version{
Major: 1,
Minor: 20,
}

// Create a new rolling average calculator to determine the average delta
Expand Down
86 changes: 86 additions & 0 deletions internal/component/loki/source/kubernetes/kubetail/tailer_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
package kubetail

import (
"context"
"os"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/common/loki/positions"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
)

// fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake"
func Test_parseKubernetesLog(t *testing.T) {
tt := []struct {
inputLine string
Expand Down Expand Up @@ -42,3 +53,78 @@ func Test_parseKubernetesLog(t *testing.T) {
})
}
}

// Test context cancellation.
func TestContextCancel(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)

target := NewTarget(
labels.FromStrings("t1", "t1", "t2", "t2"),
labels.FromStrings("p1", "p1", "p2", "p2"),
)

pos, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: t.TempDir() + "/positions.yml",
})
require.NoError(t, err)

namespace := "alloy-test-ns"
podName := "alloy-pod1"
kubeObjects := []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"label1": "value1",
},
},
},
}
fakeClientset := fake.NewClientset(kubeObjects...)
entries := make(chan loki.Entry)
handler := loki.NewEntryHandler(entries, func() {})

task := &tailerTask{
Options: &Options{
Client: fakeClientset,
Handler: handler,
Positions: pos,
},
Target: target,
}

ctx, cancelFunc := context.WithCancel(context.Background())

tailer := newTailer(logger, task)
go tailer.Run(ctx)

// Let the tailer run for a bit.
time.Sleep(10 * time.Second)

// deletionOption := metav1.NewDeleteOptions(0)
// err = fakeClientset.CoreV1().Pods(namespace).Delete(ctx, podName, *deletionOption)
// require.NoError(t, err)

// fakeClientset.CoreV1().(*fakecorev1.FakeCoreV1).PrependReactor("get", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
// return true, &v1.Pod{}, errors.New("Error creating secret")
// })

// fakeClientset.CoreV1().(*fakecorev1.FakeCoreV1).PrependReactor("logs", "alloy-pod1", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
// return true, &v1.Pod{}, errors.New("Error creating logs")
// })

// Let the tailer run for a bit.
time.Sleep(10 * time.Second)

for entry := range entries {
// k8s.io/[email protected]/kubernetes/typed/core/v1/fake/fake_pod_expansion.go#GetLogs
require.Equal(t, entry.Line, "fake logs")
//TODO: Also check the targets array
}

cancelFunc()
}

// Test tailer restart.

0 comments on commit 3b08cde

Please sign in to comment.