Skip to content

Commit

Permalink
Check Pulled events for airgap inttests
Browse files Browse the repository at this point in the history
This is an additional safety net to ensure that there are no unwanted
image pulls during the inttests, in case the network airgapping is
ineffective.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Jan 15, 2024
1 parent 537ebdb commit 38da8c6
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 20 deletions.
57 changes: 50 additions & 7 deletions inttest/airgap/airgap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@ limitations under the License.
package airgap

import (
"context"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/suite"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/k0sproject/k0s/inttest/common"
"github.com/k0sproject/k0s/pkg/airgap"
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

const k0sConfig = `
Expand All @@ -40,10 +47,11 @@ type AirgapSuite struct {
}

func (s *AirgapSuite) TestK0sGetsUp() {
ctx := s.Context()
err := (&common.Airgap{
SSH: s.SSH,
Logf: s.T().Logf,
}).LockdownMachines(s.Context(),
}).LockdownMachines(ctx,
s.ControllerNode(0), s.WorkerNode(0),
)
s.Require().NoError(err)
Expand All @@ -55,6 +63,8 @@ func (s *AirgapSuite) TestK0sGetsUp() {
kc, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

ctx = cancelOnBadPulledEvent(ctx, s.T(), kc)

err = s.WaitForNodeReady(s.WorkerNode(0), kc)
s.NoError(err)

Expand All @@ -65,10 +75,10 @@ func (s *AirgapSuite) TestK0sGetsUp() {
s.AssertSomeKubeSystemPods(kc)

s.T().Log("waiting to see kube-router pods ready")
s.NoError(common.WaitForKubeRouterReady(s.Context(), kc), "kube-router did not start")
s.NoError(common.WaitForKubeRouterReady(ctx, kc), "kube-router did not start")

// at that moment we can assume that all pods has at least started
events, err := kc.CoreV1().Events("kube-system").List(s.Context(), v1.ListOptions{
events, err := kc.CoreV1().Events("kube-system").List(ctx, metav1.ListOptions{
Limit: 100,
})
s.Require().NoError(err)
Expand All @@ -95,14 +105,47 @@ func (s *AirgapSuite) TestK0sGetsUp() {
s.Fail("Require all images be installed from bundle")
}
// Check that all the images have io.cri-containerd.pinned=pinned label
ssh, err := s.SSH(s.Context(), s.WorkerNode(0))
ssh, err := s.SSH(ctx, s.WorkerNode(0))
s.Require().NoError(err)
for _, i := range airgap.GetImageURIs(v1beta1.DefaultClusterSpec(), true) {
output, err := ssh.ExecWithOutput(s.Context(), fmt.Sprintf(`k0s ctr i ls "name==%s"`, i))
s.Require().NoError(err)
s.Require().Contains(output, "io.cri-containerd.pinned=pinned", "expected %s image to have io.cri-containerd.pinned=pinned label", i)
}
}

func cancelOnBadPulledEvent(ctx context.Context, t *testing.T, kc *kubernetes.Clientset) context.Context {
testCtx, cancelTest := context.WithCancelCause(ctx)
watchCtx, closeWatch := context.WithCancel(context.Background())

pullErr := make(chan error, 1)
t.Cleanup(func() {
closeWatch()
if pullErr, ok := <-pullErr; assert.True(t, ok, "Expected to receive an item") {
if pullErr != watchCtx.Err() {
assert.Failf(t, "Error while watching Pulled events", "%v", pullErr)
}
}
})

go func() {
defer close(pullErr)
pullErr <- watch.Events(kc.CoreV1().Events("")).
WithFieldSelector(fields.AndSelectors(
fields.OneTermEqualSelector("involvedObject.kind", "Pod"),
fields.OneTermEqualSelector("reason", "Pulled"),
)).
Until(watchCtx, func(e *corev1.Event) (bool, error) {
if !strings.HasSuffix(e.Message, "already present on machine") {
err := fmt.Errorf("unexpected Pulled event: %s", e.Message)
cancelTest(err)
return false, err
}
return false, nil
})
}()

return testCtx
}

func TestAirgapSuite(t *testing.T) {
Expand Down
63 changes: 50 additions & 13 deletions inttest/ap-airgap/airgap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
package airgap

import (
"context"
"fmt"
"strings"
"testing"

"github.com/k0sproject/k0s/inttest/common"
aptest "github.com/k0sproject/k0s/inttest/common/autopilot"
apconst "github.com/k0sproject/k0s/pkg/autopilot/constant"
appc "github.com/k0sproject/k0s/pkg/autopilot/controller/plans/core"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

"github.com/k0sproject/k0s/inttest/common"
aptest "github.com/k0sproject/k0s/inttest/common/autopilot"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)

Expand All @@ -35,6 +42,13 @@ type airgapSuite struct {
// state which we can run tests against.
func (s *airgapSuite) SetupTest() {
ctx := s.Context()
err := (&common.Airgap{
SSH: s.SSH,
Logf: s.T().Logf,
}).LockdownMachines(ctx,
s.ControllerNode(0), s.WorkerNode(0),
)
s.Require().NoError(err)

// Note that the token is intentionally empty for the first controller
s.Require().NoError(s.InitController(0, "--disable-components=metrics-server"))
Expand All @@ -46,28 +60,51 @@ func (s *airgapSuite) SetupTest() {
s.Require().NoError(aptest.WaitForCRDByName(ctx, cClient, "plans"))
s.Require().NoError(aptest.WaitForCRDByName(ctx, cClient, "controlnodes"))

wClient, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

failOnBadPulledEvent(ctx, s.T(), wClient)

// Create a worker join token
workerJoinToken, err := s.GetJoinToken("worker")
s.Require().NoError(err)

// Start the workers using the join token
s.Require().NoError(s.RunWorkersWithToken(workerJoinToken))

wClient, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

s.Require().NoError(s.WaitForNodeReady(s.WorkerNode(0), wClient))
}

func (s *airgapSuite) TestApply() {
err := (&common.Airgap{
SSH: s.SSH,
Logf: s.T().Logf,
}).LockdownMachines(s.Context(),
s.ControllerNode(0), s.WorkerNode(0),
)
s.Require().NoError(err)
func failOnBadPulledEvent(ctx context.Context, t *testing.T, kc *kubernetes.Clientset) {
watchCtx, closeWatch := context.WithCancel(context.Background())

pullErr := make(chan error, 1)
t.Cleanup(func() {
closeWatch()
if pullErr, ok := <-pullErr; assert.True(t, ok, "Expected to receive an item") {
if pullErr != watchCtx.Err() {
assert.Failf(t, "Error while watching Pulled events", "%v", pullErr)
}
}
})

go func() {
defer close(pullErr)
pullErr <- watch.Events(kc.CoreV1().Events("")).
WithFieldSelector(fields.AndSelectors(
fields.OneTermEqualSelector("involvedObject.kind", "Pod"),
fields.OneTermEqualSelector("reason", "Pulled"),
)).
Until(watchCtx, func(e *corev1.Event) (bool, error) {
if !strings.HasSuffix(e.Message, "already present on machine") {
return false, fmt.Errorf("unexpected Pulled event: %s", e.Message)
}
return false, nil
})
}()
}

func (s *airgapSuite) TestApply() {
planTemplate := `
apiVersion: autopilot.k0sproject.io/v1beta2
kind: Plan
Expand Down

0 comments on commit 38da8c6

Please sign in to comment.