Skip to content

Commit

Permalink
plz: make teardown into a unique call
Browse files Browse the repository at this point in the history
  • Loading branch information
yorugac committed Jan 8, 2024
1 parent 9bc1c35 commit 848aa06
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 47 deletions.
11 changes: 11 additions & 0 deletions api/v1alpha1/k6conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const (
// - if True, it's after successful starter but before all runners have finished
TestRunRunning = "TestRunRunning"

// TeardownExecuted indicates whether the `teardown()` has been executed on one of the runners.
// This condition can be used only in PLZ test runs.
TeardownExecuted = "TeardownExecuted"

// CloudTestRun indicates if this test run is supposed to be a cloud test run
// (i.e. with `--out cloud` option).
// - if empty / Unknown, the type of test is unknown yet
Expand Down Expand Up @@ -71,6 +75,13 @@ func Initialize(k6 TestRunI) {
Reason: "TestRunPreparation",
Message: "",
},
metav1.Condition{
Type: TeardownExecuted,
Status: metav1.ConditionFalse,
LastTransitionTime: t,
Reason: "TeardownExecutedFalse",
Message: "",
},
}

UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse)
Expand Down
11 changes: 11 additions & 0 deletions api/v1alpha1/testruni.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -31,3 +32,13 @@ func TestRunID(k6 TestRunI) string {
}
return k6.GetStatus().TestRunID
}

func ListOptions(k6 TestRunI) *client.ListOptions {
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.NamespacedName().Name,
"runner": "true",
})

return &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace}
}
59 changes: 59 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"time"

"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/testrun"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -186,3 +189,59 @@ func getEnvVar(vars []corev1.EnvVar, name string) string {
}
return ""
}

func (r *TestRunReconciler) hostnames(ctx context.Context, log logr.Logger, abortOnUnready bool, opts *client.ListOptions) ([]string, error) {
var (
hostnames []string
err error
)

sl := &v1.ServiceList{}

if err = r.List(ctx, sl, opts); err != nil {
log.Error(err, "Could not list services")
return nil, err
}

for _, service := range sl.Items {
log.Info(fmt.Sprintf("Checking service %s", service.Name))
if isServiceReady(log, &service) {
log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name))
hostnames = append(hostnames, service.Spec.ClusterIP)
} else {
err = errors.New(fmt.Sprintf("%v service is not ready", service.ObjectMeta.Name))

Check failure on line 212 in controllers/common.go

View workflow job for this annotation

GitHub Actions / golangci

S1028: should use fmt.Errorf(...) instead of errors.New(fmt.Sprintf(...)) (gosimple)
log.Info(err.Error())
if abortOnUnready {
return nil, err
}
}
}

return hostnames, nil
}

func runSetup(ctx context.Context, hostnames []string, log logr.Logger) error {
log.Info("Invoking setup() on the first runner")

setupData, err := testrun.RunSetup(ctx, hostnames[0])
if err != nil {
return err
}

log.Info("Sending setup data to the runners")

if err = testrun.SetSetupData(ctx, hostnames, setupData); err != nil {
return err
}

return nil
}

func runTeardown(ctx context.Context, hostnames []string, log logr.Logger) {
fmt.Println("runTeardown, hostnames:", hostnames)
log.Info("Invoking teardown() on the first responsive runner")

if err := testrun.RunTeardown(ctx, hostnames); err != nil {
log.Error(err, "Failed to invoke teardown()")
}
}
46 changes: 11 additions & 35 deletions controllers/k6_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import (
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/resources/jobs"
"github.com/grafana/k6-operator/pkg/testrun"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func isServiceReady(log logr.Logger, service *v1.Service) bool {
Expand All @@ -41,13 +38,8 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te

log.Info("Waiting for pods to get ready")

selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.NamespacedName().Name,
"runner": "true",
})
opts := v1alpha1.ListOptions(k6)

opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace}
pl := &v1.PodList{}
if err = r.List(ctx, pl, opts); err != nil {
log.Error(err, "Could not list pods")
Expand Down Expand Up @@ -88,38 +80,22 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te

// services

var hostnames []string
sl := &v1.ServiceList{}
log.Info("Waiting for services to get ready")

if err = r.List(ctx, sl, opts); err != nil {
log.Error(err, "Could not list services")
return res, nil
}

for _, service := range sl.Items {
hostnames = append(hostnames, service.Spec.ClusterIP)

if !isServiceReady(log, &service) {
log.Info(fmt.Sprintf("%v service is not ready, aborting", service.ObjectMeta.Name))
return res, nil
} else {
log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name))
}
}

// setup

log.Info("Invoking setup() on the first runner")

setupData, err := testrun.RunSetup(ctx, hostnames[0])
hostnames, err := r.hostnames(ctx, log, true, opts)
log.Info(fmt.Sprintf("err: %v, hostnames: %v", err, hostnames))
if err != nil {
return ctrl.Result{}, err
}

log.Info("Sending setup data to the runners")
log.Info(fmt.Sprintf("%d/%d services ready", len(hostnames), k6.GetSpec().Parallelism))

if err = testrun.SetSetupData(ctx, hostnames, setupData); err != nil {
return ctrl.Result{}, err
// setup

if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
if err := runSetup(ctx, hostnames, log); err != nil {
return ctrl.Result{}, err
}
}

// starter
Expand Down
10 changes: 5 additions & 5 deletions controllers/k6_stopped_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func isJobRunning(log logr.Logger, service *v1.Service) bool {
return true
}

return status.Status().Stopped
return status.Status().Running
}

// StoppedJobs checks if the runners pods have stopped execution.
Expand All @@ -70,17 +70,17 @@ func StoppedJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *
return
}

var count int32
var runningJobs int32
for _, service := range sl.Items {

if isJobRunning(log, &service) {
count++
runningJobs++
}
}

log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-count, k6.GetSpec().Parallelism))
log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-runningJobs, k6.GetSpec().Parallelism))

if count > 0 {
if runningJobs > 0 {
return
}

Expand Down
34 changes: 31 additions & 3 deletions controllers/testrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,36 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log
return ctrl.Result{}, nil
}

// wait for the test to finish
if !FinishJobs(ctx, log, k6, r) {
if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
runningTime, _ := v1alpha1.LastUpdate(k6, v1alpha1.TestRunRunning)

if v1alpha1.IsFalse(k6, v1alpha1.TeardownExecuted) {
var allJobsStopped bool
// TODO: figure out baseline time
if time.Since(runningTime) > time.Second*30 {
allJobsStopped = StoppedJobs(ctx, log, k6, r)
}

// The test run reached a regular stop in execution so execute teardown
if v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) && allJobsStopped {
hostnames, err := r.hostnames(ctx, log, false, v1alpha1.ListOptions(k6))
if err != nil {
return ctrl.Result{}, nil
}
runTeardown(ctx, hostnames, log)
v1alpha1.UpdateCondition(k6, v1alpha1.TeardownExecuted, metav1.ConditionTrue)

_, err = r.UpdateStatus(ctx, k6, log)
return ctrl.Result{}, err
// NOTE: we proceed here regardless whether teardown() is successful or not
} else {
// Test runs can take a long time and usually they aren't supposed
// to be too quick. So check in only periodically.
return ctrl.Result{RequeueAfter: time.Second * 15}, nil
}
}
} else if !FinishJobs(ctx, log, k6, r) {
// wait for the test to finish

// TODO: confirm if this check is needed given the check in the beginning of reconcile
if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) {
Expand Down Expand Up @@ -285,7 +313,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log
log.Info("Changing stage of TestRun status to finished")
k6.GetStatus().Stage = "finished"

_, err := r.UpdateStatus(ctx, k6, log)
_, err = r.UpdateStatus(ctx, k6, log)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/resources/jobs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ func NewRunnerJob(k6 v1alpha1.TestRunI, index int, token string) (*batchv1.Job,
command = append(command, "--paused")
}

command = append(command, "--no-setup")

// Add an instance tag: in case metrics are stored, they need to be distinguished by instance
command = append(command, "--tag", fmt.Sprintf("instance_id=%d", index))

// Add an job tag: in case metrics are stored, they need to be distinguished by job
command = append(command, "--tag", fmt.Sprintf("job_name=%s", name))

if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
command = append(command, "--no-setup", "--no-teardown", "--linger")
}

command = script.UpdateCommand(command)

var (
Expand Down
Loading

0 comments on commit 848aa06

Please sign in to comment.