Skip to content

Commit

Permalink
Enable context-aware logging in the inttest wait functions
Browse files Browse the repository at this point in the history
1. Put the currently executed test in the bootloose suite's context
   before returning it to the test suites.
2. Introduce the `logfFrom` function and use it everywhere. It
   determines the appropriate logging callback from the context,
   potentially using testing.T's Logf, or falling back to logrus's
   Infof.

Also address the deprecation of `PollImmediateUntilWithContext` by
replacing it with `PollUntilContextCancel`, as stated in the deprecation
notice.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Nov 22, 2023
1 parent 2602a1e commit 487ed5f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
3 changes: 2 additions & 1 deletion inttest/common/bootloosesuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/k0sproject/k0s/internal/pkg/file"
apclient "github.com/k0sproject/k0s/pkg/client/clientset"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/k0sproject/k0s/pkg/k0scontext"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"

Expand Down Expand Up @@ -284,7 +285,7 @@ func (s *BootlooseSuite) waitForSSH(ctx context.Context) {
func (s *BootlooseSuite) Context() context.Context {
ctx := s.ctx
s.Require().NotNil(ctx, "No suite context installed")
return ctx
return k0scontext.WithValue(ctx, s.T())
}

// ControllerNode gets the node name of given controller index
Expand Down
34 changes: 26 additions & 8 deletions inttest/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"regexp"
"strings"
"syscall"
"testing"
"time"

"github.com/k0sproject/k0s/pkg/constant"
"github.com/k0sproject/k0s/pkg/k0scontext"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -46,10 +48,13 @@ import (
"github.com/sirupsen/logrus"
)

// LogfFn will be used whenever something needs to be logged.
type LogfFn func(format string, args ...any)

// Poll tries a condition func until it returns true, an error or the specified
// context is canceled or expired.
func Poll(ctx context.Context, condition wait.ConditionWithContextFunc) error {
return wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, condition)
return wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, condition)
}

// WaitForKubeRouterReady waits to see all kube-router pods healthy as long as
Expand Down Expand Up @@ -106,7 +111,7 @@ func WaitForMetricsReady(ctx context.Context, c *rest.Config) error {
watchAPIServices := watch.FromClient[*apiregistrationv1.APIServiceList, apiregistrationv1.APIService]
return watchAPIServices(clientset.ApiregistrationV1().APIServices()).
WithObjectName("v1beta1.metrics.k8s.io").
WithErrorCallback(RetryWatchErrors(logrus.Infof)).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(ctx, func(service *apiregistrationv1.APIService) (bool, error) {
for _, c := range service.Status.Conditions {
if c.Type == apiregistrationv1.Available {
Expand All @@ -125,6 +130,7 @@ func WaitForMetricsReady(ctx context.Context, c *rest.Config) error {
func WaitForNodeReadyStatus(ctx context.Context, clients kubernetes.Interface, nodeName string, status corev1.ConditionStatus) error {
return watch.Nodes(clients.CoreV1().Nodes()).
WithObjectName(nodeName).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(ctx, func(node *corev1.Node) (done bool, err error) {
for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady {
Expand All @@ -145,7 +151,7 @@ func WaitForNodeReadyStatus(ctx context.Context, clients kubernetes.Interface, n
func WaitForDaemonSet(ctx context.Context, kc *kubernetes.Clientset, name string) error {
return watch.DaemonSets(kc.AppsV1().DaemonSets("kube-system")).
WithObjectName(name).
WithErrorCallback(RetryWatchErrors(logrus.Infof)).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(ctx, func(ds *appsv1.DaemonSet) (bool, error) {
return ds.Status.NumberAvailable == ds.Status.DesiredNumberScheduled, nil
})
Expand All @@ -156,7 +162,7 @@ func WaitForDaemonSet(ctx context.Context, kc *kubernetes.Clientset, name string
func WaitForDeployment(ctx context.Context, kc *kubernetes.Clientset, name, namespace string) error {
return watch.Deployments(kc.AppsV1().Deployments(namespace)).
WithObjectName(name).
WithErrorCallback(RetryWatchErrors(logrus.Infof)).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(ctx, func(deployment *appsv1.Deployment) (bool, error) {
for _, c := range deployment.Status.Conditions {
if c.Type == appsv1.DeploymentAvailable {
Expand All @@ -177,7 +183,7 @@ func WaitForDeployment(ctx context.Context, kc *kubernetes.Clientset, name, name
func WaitForStatefulSet(ctx context.Context, kc *kubernetes.Clientset, name, namespace string) error {
return watch.StatefulSets(kc.AppsV1().StatefulSets(namespace)).
WithObjectName(name).
WithErrorCallback(RetryWatchErrors(logrus.Infof)).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(ctx, func(s *appsv1.StatefulSet) (bool, error) {
return s.Status.ReadyReplicas == *s.Spec.Replicas, nil
})
Expand All @@ -203,7 +209,7 @@ func waitForDefaultStorageClass(kc *kubernetes.Clientset) wait.ConditionWithCont
func WaitForPod(ctx context.Context, kc *kubernetes.Clientset, name, namespace string) error {
return watch.Pods(kc.CoreV1().Pods(namespace)).
WithObjectName(name).
WithErrorCallback(RetryWatchErrors(logrus.Infof)).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(ctx, func(pod *corev1.Pod) (bool, error) {
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady {
Expand Down Expand Up @@ -251,7 +257,7 @@ func WaitForLease(ctx context.Context, kc *kubernetes.Clientset, name string, na
watchLeases := watch.FromClient[*coordinationv1.LeaseList, coordinationv1.Lease]
if err := watchLeases(kc.CoordinationV1().Leases(namespace)).
WithObjectName(name).
WithErrorCallback(RetryWatchErrors(logrus.Infof)).
WithErrorCallback(RetryWatchErrors(logfFrom(ctx))).
Until(
ctx, func(lease *coordinationv1.Lease) (bool, error) {
holderIdentity = *lease.Spec.HolderIdentity
Expand All @@ -265,7 +271,7 @@ func WaitForLease(ctx context.Context, kc *kubernetes.Clientset, name string, na
return holderIdentity, nil
}

func RetryWatchErrors(logf func(format string, args ...any)) watch.ErrorCallback {
func RetryWatchErrors(logf LogfFn) watch.ErrorCallback {
return func(err error) (time.Duration, error) {
if retryDelay, e := watch.IsRetryable(err); e == nil {
logf("Encountered transient watch error, retrying in %s: %v", retryDelay, err)
Expand Down Expand Up @@ -327,3 +333,15 @@ func VerifyKubeletMetrics(ctx context.Context, kc *kubernetes.Clientset, node st
return false, nil
})
}

// Retrieves the LogfFn stored in context, falling back to use testing.T's Logf
// if the context has a *testing.T or logrus's Infof as a last resort.
func logfFrom(ctx context.Context) LogfFn {
if logf := k0scontext.Value[LogfFn](ctx); logf != nil {
return logf
}
if t := k0scontext.Value[*testing.T](ctx); t != nil {
return t.Logf
}
return logrus.Infof
}

0 comments on commit 487ed5f

Please sign in to comment.