Skip to content

Commit

Permalink
fix(plugin): handle multiple containers in kubectl cnpg logs (cloud…
Browse files Browse the repository at this point in the history
…native-pg#5931)

This patch fixes an issue in the `kubectl cnpg logs` command that leads
to a failure if the instance pod has more than one container.

Closes cloudnative-pg#5905

Signed-off-by: Marco Nenciarini <[email protected]>
Signed-off-by: Armando Ruocco <[email protected]>
Co-authored-by: Armando Ruocco <[email protected]>
  • Loading branch information
mnencia and armru authored Oct 23, 2024
1 parent 8ff0929 commit 1a70c90
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 25 deletions.
24 changes: 16 additions & 8 deletions pkg/utils/logs/cluster_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -46,7 +46,7 @@ const DefaultFollowWaiting time.Duration = 1 * time.Second
// streaming
type ClusterStreamingRequest struct {
Cluster *apiv1.Cluster
Options *v1.PodLogOptions
Options *corev1.PodLogOptions
Previous bool `json:"previous,omitempty"`
FollowWaiting time.Duration
// NOTE: the Client argument may be omitted, but it is good practice to pass it
Expand All @@ -62,14 +62,17 @@ func (csr *ClusterStreamingRequest) getClusterNamespace() string {
return csr.Cluster.Namespace
}

func (csr *ClusterStreamingRequest) getLogOptions(containerName string) *v1.PodLogOptions {
func (csr *ClusterStreamingRequest) getLogOptions(containerName string) *corev1.PodLogOptions {
if csr.Options == nil {
csr.Options = &v1.PodLogOptions{
return &corev1.PodLogOptions{
Container: containerName,
Previous: csr.Previous,
}
}
csr.Options.Previous = csr.Previous
return csr.Options
options := csr.Options.DeepCopy()
options.Container = containerName
options.Previous = csr.Previous
return options
}

func (csr *ClusterStreamingRequest) getKubernetesClient() kubernetes.Interface {
Expand Down Expand Up @@ -135,6 +138,8 @@ func (as *activeSet) add(name string) {

// has returns true if and only if name is active
func (as *activeSet) has(name string) bool {
as.m.Lock()
defer as.m.Unlock()
_, found := as.set[name]
return found
}
Expand All @@ -149,6 +154,8 @@ func (as *activeSet) drop(name string) {

// isZero checks if there are any active processes
func (as *activeSet) isZero() bool {
as.m.Lock()
defer as.m.Unlock()
return len(as.set) == 0
}

Expand All @@ -169,7 +176,7 @@ func (csr *ClusterStreamingRequest) SingleStream(ctx context.Context, writer io.

for {
var (
podList *v1.PodList
podList *corev1.PodList
err error
)
if isFirstScan || csr.Options.Follow {
Expand All @@ -189,6 +196,7 @@ func (csr *ClusterStreamingRequest) SingleStream(ctx context.Context, writer io.
return nil
}

wrappedWriter := safeWriterFrom(writer)
for _, pod := range podList.Items {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
Expand All @@ -204,7 +212,7 @@ func (csr *ClusterStreamingRequest) SingleStream(ctx context.Context, writer io.
container.Name,
client,
streamSet,
safeWriterFrom(writer),
wrappedWriter,
)
}
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/utils/logs/cluster_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

Expand All @@ -33,6 +33,23 @@ import (
. "github.com/onsi/gomega"
)

type syncBuffer struct {
b bytes.Buffer
m sync.Mutex
}

func (b *syncBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}

func (b *syncBuffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}

var _ = Describe("Cluster logging tests", func() {
clusterNamespace := "cluster-test"
clusterName := "myTestCluster"
Expand All @@ -42,45 +59,45 @@ var _ = Describe("Cluster logging tests", func() {
Name: clusterName,
},
}
pod := &v1.Pod{
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: clusterName + "-1",
Labels: map[string]string{
utils.ClusterLabelName: clusterName,
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "postgresql",
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{},
},
},
},
},
}
podWithSidecars := &v1.Pod{
podWithSidecars := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: clusterName + "-1",
Labels: map[string]string{
utils.ClusterLabelName: clusterName,
},
},
Status: v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "postgresql",
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{},
},
},
{
Name: "sidecar",
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{},
},
},
},
Expand All @@ -96,7 +113,7 @@ var _ = Describe("Cluster logging tests", func() {
defer wait.Done()
streamClusterLogs := ClusterStreamingRequest{
Cluster: cluster,
Options: &v1.PodLogOptions{
Options: &corev1.PodLogOptions{
Follow: false,
},
Client: client,
Expand All @@ -119,7 +136,7 @@ var _ = Describe("Cluster logging tests", func() {
defer wait.Done()
streamClusterLogs := ClusterStreamingRequest{
Cluster: cluster,
Options: &v1.PodLogOptions{
Options: &corev1.PodLogOptions{
Follow: false,
},
Client: client,
Expand All @@ -134,7 +151,7 @@ var _ = Describe("Cluster logging tests", func() {

It("should catch extra logs if given the follow option", func(ctx context.Context) {
client := fake.NewSimpleClientset(pod)
var logBuffer bytes.Buffer
var logBuffer syncBuffer
// let's set a short follow-wait, and keep the cluster streaming for two
// cycles
followWaiting := 200 * time.Millisecond
Expand All @@ -143,7 +160,7 @@ var _ = Describe("Cluster logging tests", func() {
defer GinkgoRecover()
streamClusterLogs := ClusterStreamingRequest{
Cluster: cluster,
Options: &v1.PodLogOptions{
Options: &corev1.PodLogOptions{
Follow: true,
},
FollowWaiting: followWaiting,
Expand Down

0 comments on commit 1a70c90

Please sign in to comment.