Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ability to fetch logs from instances #564

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions e2e/basic/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package basic

import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/celestiaorg/knuu/pkg/instance"
"github.com/celestiaorg/knuu/pkg/system"
)

const expectedLogMsg = "Hello World"

type sidecarLogsTest struct {
instance *instance.Instance
}

var _ instance.SidecarManager = (*sidecarLogsTest)(nil)

func (s *Suite) TestLogs() {
const namePrefix = "logs"
ctx := context.Background()

// Create a new instance
target, err := s.Knuu.NewInstance(namePrefix + "-target")
s.Require().NoError(err)

// Set the image and start command to generate logs
s.Require().NoError(target.Build().SetImage(ctx, alpineImage))
s.Require().NoError(target.Build().SetStartCommand("sh", "-c", fmt.Sprintf("while true; do echo '%s'; sleep 1; done", expectedLogMsg)))
s.Require().NoError(target.Build().Commit(ctx))
s.Require().NoError(target.Execution().Start(ctx))

// Wait for a short duration to allow log generation
time.Sleep(5 * time.Second)

logStream, err := target.Monitoring().Logs(ctx)
s.Require().NoError(err)
defer logStream.Close()

logs, err := io.ReadAll(logStream)
s.Require().NoError(err)

logOutput := string(logs)
s.Contains(logOutput, expectedLogMsg)
}

func (s *Suite) TestLogsWithSidecar() {
smuu marked this conversation as resolved.
Show resolved Hide resolved
const namePrefix = "logs-sidecar"
ctx := context.Background()

// Create a new instance
target, err := s.Knuu.NewInstance(namePrefix + "-target")
s.Require().NoError(err)

sidecar := &sidecarLogsTest{}

s.Require().NoError(target.Build().SetImage(ctx, alpineImage))
s.Require().NoError(target.Build().SetStartCommand("sh", "-c", "sleep infinity"))
s.Require().NoError(target.Build().Commit(ctx))
s.Require().NoError(target.Sidecars().Add(ctx, sidecar))
s.Require().NoError(target.Execution().Start(ctx))

// Wait for a short duration to allow log generation
time.Sleep(5 * time.Second)

logStream, err := sidecar.Instance().Monitoring().Logs(ctx)
s.Require().NoError(err)
defer logStream.Close()

logs, err := io.ReadAll(logStream)
s.Require().NoError(err)

logOutput := string(logs)
s.Contains(logOutput, expectedLogMsg)
}

func (sl *sidecarLogsTest) Initialize(ctx context.Context, namePrefix string, sysDeps *system.SystemDependencies) error {
var err error
sl.instance, err = instance.New(namePrefix+"-sidecar-logs", sysDeps)
if err != nil {
return err
}
sl.instance.Sidecars().SetIsSidecar(true)

if err := sl.instance.Build().SetImage(ctx, alpineImage); err != nil {
return err
}

err = sl.instance.Build().SetStartCommand("sh", "-c", fmt.Sprintf("while true; do echo '%s'; sleep 1; done", expectedLogMsg))
if err != nil {
return err
}

if err := sl.instance.Build().Commit(ctx); err != nil {
return err
}
return nil
}

func (sl *sidecarLogsTest) PreStart(ctx context.Context) error {
if sl.instance == nil {
return errors.New("instance not initialized")
}
return nil
}

func (sl *sidecarLogsTest) Instance() *instance.Instance {
return sl.instance
}

func (sl *sidecarLogsTest) Clone(namePrefix string) (instance.SidecarManager, error) {
clone, err := sl.instance.CloneWithName(namePrefix + "-" + sl.instance.Name())
if err != nil {
return nil, err
}
return &sidecarLogsTest{
instance: clone,
}, nil
}
10 changes: 10 additions & 0 deletions pkg/instance/monitoring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package instance

import (
"context"
"io"

"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)
Expand All @@ -16,6 +19,13 @@ func (i *Instance) Monitoring() *monitoring {
return i.monitoring
}

func (m *monitoring) Logs(ctx context.Context) (io.ReadCloser, error) {
if m.instance.sidecars.IsSidecar() {
return m.instance.K8sClient.GetLogStream(ctx, m.instance.parentInstance.Name(), m.instance.Name())
}
return m.instance.K8sClient.GetLogStream(ctx, m.instance.Name(), m.instance.Name())
}

// SetLivenessProbe sets the liveness probe of the instance
// A live probe is a probe that is used to determine if the instance is still alive, and should be restarted if not
// See usage documentation: https://pkg.go.dev/i.K8sCli.io/api/core/[email protected]#Probe
Expand Down
23 changes: 23 additions & 0 deletions pkg/k8s/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package k8s

import (
"context"
"io"

v1 "k8s.io/api/core/v1"
)

func (c *Client) GetLogStream(ctx context.Context, replicaSetName string, containerName string) (io.ReadCloser, error) {
logOptions := &v1.PodLogOptions{}
if containerName != "" {
logOptions.Container = containerName
}

pod, err := c.GetFirstPodFromReplicaSet(ctx, replicaSetName)
if err != nil {
return nil, err
}

req := c.Clientset().CoreV1().Pods(c.Namespace()).GetLogs(pod.Name, logOptions)
return req.Stream(ctx)
}
2 changes: 2 additions & 0 deletions pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package k8s

import (
"context"
"io"

appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,6 +51,7 @@ type KubeManager interface {
GetConfigMap(ctx context.Context, name string) (*corev1.ConfigMap, error)
GetDaemonSet(ctx context.Context, name string) (*appv1.DaemonSet, error)
GetFirstPodFromReplicaSet(ctx context.Context, name string) (*corev1.Pod, error)
GetLogStream(ctx context.Context, podName string, containerName string) (io.ReadCloser, error)
GetNamespace(ctx context.Context, name string) (*corev1.Namespace, error)
GetNetworkPolicy(ctx context.Context, name string) (*netv1.NetworkPolicy, error)
GetService(ctx context.Context, name string) (*corev1.Service, error)
Expand Down
Loading