From 38da8c608dbc10d60d592d80c726492f811d510d Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Mon, 15 Jan 2024 16:27:48 +0100 Subject: [PATCH] Check Pulled events for airgap inttests This is an additional safety net to ensure that there are no unwanted image pulls during the inttests, in case the network airgapping is ineffective. Signed-off-by: Tom Wieczorek --- inttest/airgap/airgap_test.go | 57 +++++++++++++++++++++++++---- inttest/ap-airgap/airgap_test.go | 63 +++++++++++++++++++++++++------- 2 files changed, 100 insertions(+), 20 deletions(-) diff --git a/inttest/airgap/airgap_test.go b/inttest/airgap/airgap_test.go index e000233d764b..773ac2edc395 100644 --- a/inttest/airgap/airgap_test.go +++ b/inttest/airgap/airgap_test.go @@ -17,16 +17,23 @@ limitations under the License. package airgap import ( + "context" "fmt" "strings" "testing" - "github.com/stretchr/testify/suite" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/k0sproject/k0s/inttest/common" "github.com/k0sproject/k0s/pkg/airgap" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) const k0sConfig = ` @@ -40,10 +47,11 @@ type AirgapSuite struct { } func (s *AirgapSuite) TestK0sGetsUp() { + ctx := s.Context() err := (&common.Airgap{ SSH: s.SSH, Logf: s.T().Logf, - }).LockdownMachines(s.Context(), + }).LockdownMachines(ctx, s.ControllerNode(0), s.WorkerNode(0), ) s.Require().NoError(err) @@ -55,6 +63,8 @@ func (s *AirgapSuite) TestK0sGetsUp() { kc, err := s.KubeClient(s.ControllerNode(0)) s.Require().NoError(err) + ctx = cancelOnBadPulledEvent(ctx, s.T(), kc) + err = s.WaitForNodeReady(s.WorkerNode(0), kc) s.NoError(err) @@ -65,10 +75,10 @@ func (s *AirgapSuite) TestK0sGetsUp() { s.AssertSomeKubeSystemPods(kc) s.T().Log("waiting to see kube-router pods ready") - s.NoError(common.WaitForKubeRouterReady(s.Context(), kc), "kube-router did not start") + s.NoError(common.WaitForKubeRouterReady(ctx, kc), "kube-router did not start") // at that moment we can assume that all pods has at least started - events, err := kc.CoreV1().Events("kube-system").List(s.Context(), v1.ListOptions{ + events, err := kc.CoreV1().Events("kube-system").List(ctx, metav1.ListOptions{ Limit: 100, }) s.Require().NoError(err) @@ -95,14 +105,47 @@ func (s *AirgapSuite) TestK0sGetsUp() { s.Fail("Require all images be installed from bundle") } // Check that all the images have io.cri-containerd.pinned=pinned label - ssh, err := s.SSH(s.Context(), s.WorkerNode(0)) + ssh, err := s.SSH(ctx, s.WorkerNode(0)) s.Require().NoError(err) for _, i := range airgap.GetImageURIs(v1beta1.DefaultClusterSpec(), true) { output, err := ssh.ExecWithOutput(s.Context(), fmt.Sprintf(`k0s ctr i ls "name==%s"`, i)) s.Require().NoError(err) s.Require().Contains(output, "io.cri-containerd.pinned=pinned", "expected %s image to have io.cri-containerd.pinned=pinned label", i) } +} + +func cancelOnBadPulledEvent(ctx context.Context, t *testing.T, kc *kubernetes.Clientset) context.Context { + testCtx, cancelTest := context.WithCancelCause(ctx) + watchCtx, closeWatch := context.WithCancel(context.Background()) + + pullErr := make(chan error, 1) + t.Cleanup(func() { + closeWatch() + if pullErr, ok := <-pullErr; assert.True(t, ok, "Expected to receive an item") { + if pullErr != watchCtx.Err() { + assert.Failf(t, "Error while watching Pulled events", "%v", pullErr) + } + } + }) + go func() { + defer close(pullErr) + pullErr <- watch.Events(kc.CoreV1().Events("")). + WithFieldSelector(fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.kind", "Pod"), + fields.OneTermEqualSelector("reason", "Pulled"), + )). + Until(watchCtx, func(e *corev1.Event) (bool, error) { + if !strings.HasSuffix(e.Message, "already present on machine") { + err := fmt.Errorf("unexpected Pulled event: %s", e.Message) + cancelTest(err) + return false, err + } + return false, nil + }) + }() + + return testCtx } func TestAirgapSuite(t *testing.T) { diff --git a/inttest/ap-airgap/airgap_test.go b/inttest/ap-airgap/airgap_test.go index 3ebf72a4bf7c..1c5be89fd2f8 100644 --- a/inttest/ap-airgap/airgap_test.go +++ b/inttest/ap-airgap/airgap_test.go @@ -15,15 +15,22 @@ package airgap import ( + "context" "fmt" + "strings" "testing" + "github.com/k0sproject/k0s/inttest/common" + aptest "github.com/k0sproject/k0s/inttest/common/autopilot" apconst "github.com/k0sproject/k0s/pkg/autopilot/constant" appc "github.com/k0sproject/k0s/pkg/autopilot/controller/plans/core" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" - "github.com/k0sproject/k0s/inttest/common" - aptest "github.com/k0sproject/k0s/inttest/common/autopilot" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -35,6 +42,13 @@ type airgapSuite struct { // state which we can run tests against. func (s *airgapSuite) SetupTest() { ctx := s.Context() + err := (&common.Airgap{ + SSH: s.SSH, + Logf: s.T().Logf, + }).LockdownMachines(ctx, + s.ControllerNode(0), s.WorkerNode(0), + ) + s.Require().NoError(err) // Note that the token is intentionally empty for the first controller s.Require().NoError(s.InitController(0, "--disable-components=metrics-server")) @@ -46,6 +60,11 @@ func (s *airgapSuite) SetupTest() { s.Require().NoError(aptest.WaitForCRDByName(ctx, cClient, "plans")) s.Require().NoError(aptest.WaitForCRDByName(ctx, cClient, "controlnodes")) + wClient, err := s.KubeClient(s.ControllerNode(0)) + s.Require().NoError(err) + + failOnBadPulledEvent(ctx, s.T(), wClient) + // Create a worker join token workerJoinToken, err := s.GetJoinToken("worker") s.Require().NoError(err) @@ -53,21 +72,39 @@ func (s *airgapSuite) SetupTest() { // Start the workers using the join token s.Require().NoError(s.RunWorkersWithToken(workerJoinToken)) - wClient, err := s.KubeClient(s.ControllerNode(0)) - s.Require().NoError(err) - s.Require().NoError(s.WaitForNodeReady(s.WorkerNode(0), wClient)) } -func (s *airgapSuite) TestApply() { - err := (&common.Airgap{ - SSH: s.SSH, - Logf: s.T().Logf, - }).LockdownMachines(s.Context(), - s.ControllerNode(0), s.WorkerNode(0), - ) - s.Require().NoError(err) +func failOnBadPulledEvent(ctx context.Context, t *testing.T, kc *kubernetes.Clientset) { + watchCtx, closeWatch := context.WithCancel(context.Background()) + + pullErr := make(chan error, 1) + t.Cleanup(func() { + closeWatch() + if pullErr, ok := <-pullErr; assert.True(t, ok, "Expected to receive an item") { + if pullErr != watchCtx.Err() { + assert.Failf(t, "Error while watching Pulled events", "%v", pullErr) + } + } + }) + go func() { + defer close(pullErr) + pullErr <- watch.Events(kc.CoreV1().Events("")). + WithFieldSelector(fields.AndSelectors( + fields.OneTermEqualSelector("involvedObject.kind", "Pod"), + fields.OneTermEqualSelector("reason", "Pulled"), + )). + Until(watchCtx, func(e *corev1.Event) (bool, error) { + if !strings.HasSuffix(e.Message, "already present on machine") { + return false, fmt.Errorf("unexpected Pulled event: %s", e.Message) + } + return false, nil + }) + }() +} + +func (s *airgapSuite) TestApply() { planTemplate := ` apiVersion: autopilot.k0sproject.io/v1beta2 kind: Plan