Skip to content

Commit

Permalink
feat: ability to fetch logs from instances (#564)
Browse files Browse the repository at this point in the history
* feat: ability to fetch logs from instances

* fix: sidecar fetching logs test
  • Loading branch information
mojtaba-esk authored Sep 23, 2024
1 parent 4ede783 commit 81328f6
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
122 changes: 122 additions & 0 deletions e2e/basic/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package basic

import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/celestiaorg/knuu/pkg/instance"
"github.com/celestiaorg/knuu/pkg/system"
)

const expectedLogMsg = "Hello World"

type sidecarLogsTest struct {
instance *instance.Instance
}

var _ instance.SidecarManager = (*sidecarLogsTest)(nil)

func (s *Suite) TestLogs() {
const namePrefix = "logs"
ctx := context.Background()

// Create a new instance
target, err := s.Knuu.NewInstance(namePrefix + "-target")
s.Require().NoError(err)

// Set the image and start command to generate logs
s.Require().NoError(target.Build().SetImage(ctx, alpineImage))
s.Require().NoError(target.Build().SetStartCommand("sh", "-c", fmt.Sprintf("while true; do echo '%s'; sleep 1; done", expectedLogMsg)))
s.Require().NoError(target.Build().Commit(ctx))
s.Require().NoError(target.Execution().Start(ctx))

// Wait for a short duration to allow log generation
time.Sleep(5 * time.Second)

logStream, err := target.Monitoring().Logs(ctx)
s.Require().NoError(err)
defer logStream.Close()

logs, err := io.ReadAll(logStream)
s.Require().NoError(err)

logOutput := string(logs)
s.Contains(logOutput, expectedLogMsg)
}

func (s *Suite) TestLogsWithSidecar() {
const namePrefix = "logs-sidecar"
ctx := context.Background()

// Create a new instance
target, err := s.Knuu.NewInstance(namePrefix + "-target")
s.Require().NoError(err)

sidecar := &sidecarLogsTest{}

s.Require().NoError(target.Build().SetImage(ctx, alpineImage))
s.Require().NoError(target.Build().SetStartCommand("sh", "-c", "sleep infinity"))
s.Require().NoError(target.Build().Commit(ctx))
s.Require().NoError(target.Sidecars().Add(ctx, sidecar))
s.Require().NoError(target.Execution().Start(ctx))

// Wait for a short duration to allow log generation
time.Sleep(5 * time.Second)

logStream, err := sidecar.Instance().Monitoring().Logs(ctx)
s.Require().NoError(err)
defer logStream.Close()

logs, err := io.ReadAll(logStream)
s.Require().NoError(err)

logOutput := string(logs)
s.Contains(logOutput, expectedLogMsg)
}

func (sl *sidecarLogsTest) Initialize(ctx context.Context, namePrefix string, sysDeps *system.SystemDependencies) error {
var err error
sl.instance, err = instance.New(namePrefix+"-sidecar-logs", sysDeps)
if err != nil {
return err
}
sl.instance.Sidecars().SetIsSidecar(true)

if err := sl.instance.Build().SetImage(ctx, alpineImage); err != nil {
return err
}

err = sl.instance.Build().SetStartCommand("sh", "-c", fmt.Sprintf("while true; do echo '%s'; sleep 1; done", expectedLogMsg))
if err != nil {
return err
}

if err := sl.instance.Build().Commit(ctx); err != nil {
return err
}
return nil
}

func (sl *sidecarLogsTest) PreStart(ctx context.Context) error {
if sl.instance == nil {
return errors.New("instance not initialized")
}
return nil
}

func (sl *sidecarLogsTest) Instance() *instance.Instance {
return sl.instance
}

func (sl *sidecarLogsTest) Clone(namePrefix string) (instance.SidecarManager, error) {
clone, err := sl.instance.CloneWithName(namePrefix + "-" + sl.instance.Name())
if err != nil {
return nil, err
}
return &sidecarLogsTest{
instance: clone,
}, nil
}
10 changes: 10 additions & 0 deletions pkg/instance/monitoring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package instance

import (
"context"
"io"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)
Expand All @@ -16,6 +19,13 @@ func (i *Instance) Monitoring() *monitoring {
return i.monitoring
}

func (m *monitoring) Logs(ctx context.Context) (io.ReadCloser, error) {
if m.instance.sidecars.IsSidecar() {
return m.instance.K8sClient.GetLogStream(ctx, m.instance.parentInstance.Name(), m.instance.Name())
}
return m.instance.K8sClient.GetLogStream(ctx, m.instance.Name(), m.instance.Name())
}

// SetLivenessProbe sets the liveness probe of the instance
// A live probe is a probe that is used to determine if the instance is still alive, and should be restarted if not
// See usage documentation: https://pkg.go.dev/i.K8sCli.io/api/core/[email protected]#Probe
Expand Down
23 changes: 23 additions & 0 deletions pkg/k8s/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package k8s

import (
"context"
"io"

v1 "k8s.io/api/core/v1"
)

func (c *Client) GetLogStream(ctx context.Context, replicaSetName string, containerName string) (io.ReadCloser, error) {
logOptions := &v1.PodLogOptions{}
if containerName != "" {
logOptions.Container = containerName
}

pod, err := c.GetFirstPodFromReplicaSet(ctx, replicaSetName)
if err != nil {
return nil, err
}

req := c.Clientset().CoreV1().Pods(c.Namespace()).GetLogs(pod.Name, logOptions)
return req.Stream(ctx)
}
2 changes: 2 additions & 0 deletions pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package k8s

import (
"context"
"io"

appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,6 +51,7 @@ type KubeManager interface {
GetConfigMap(ctx context.Context, name string) (*corev1.ConfigMap, error)
GetDaemonSet(ctx context.Context, name string) (*appv1.DaemonSet, error)
GetFirstPodFromReplicaSet(ctx context.Context, name string) (*corev1.Pod, error)
GetLogStream(ctx context.Context, podName string, containerName string) (io.ReadCloser, error)
GetNamespace(ctx context.Context, name string) (*corev1.Namespace, error)
GetNetworkPolicy(ctx context.Context, name string) (*netv1.NetworkPolicy, error)
GetService(ctx context.Context, name string) (*corev1.Service, error)
Expand Down

0 comments on commit 81328f6

Please sign in to comment.