Skip to content

Commit

Permalink
refactor: stream to os out, but utilise accordions for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
shreddedbacon committed Sep 29, 2023
1 parent 09a5d03 commit f6b5cd5
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 46 deletions.
25 changes: 13 additions & 12 deletions cmd/tasks_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/spf13/cobra"
"github.com/uselagoon/build-deploy-tool/internal/generator"
"github.com/uselagoon/build-deploy-tool/internal/lagoon"
"github.com/uselagoon/build-deploy-tool/internal/tasklib"
"io/ioutil"
"os"
"strings"
)

var runPreRollout, runPostRollout, outOfClusterConfig bool
Expand All @@ -31,7 +32,7 @@ var taskCmd = &cobra.Command{
// unidleThenRun is a wrapper around 'runCleanTaskInEnvironment' used for pre-rollout tasks
// We actually want to unidle the namespace before running pre-rollout tasks,
// so we wrap the usual task runner before calling it.
func unidleThenRun(namespace string, incoming lagoon.Task) error {
func unidleThenRun(namespace string, prePost string, incoming lagoon.Task) error {
fmt.Printf("Unidling namespace with RequiresEnvironment: %v, ScaleMaxIterations:%v and ScaleWaitTime:%v\n", incoming.RequiresEnvironment, incoming.ScaleMaxIterations, incoming.ScaleWaitTime)
err := lagoon.UnidleNamespace(context.TODO(), namespace, incoming.ScaleMaxIterations, incoming.ScaleWaitTime)
if err != nil {
Expand All @@ -47,7 +48,7 @@ func unidleThenRun(namespace string, incoming lagoon.Task) error {
return fmt.Errorf("There was a problem when unidling the environment for pre-rollout tasks: %v", err.Error())
}
}
return runCleanTaskInEnvironment(namespace, incoming)
return runCleanTaskInEnvironment(namespace, prePost, incoming)
}

var tasksPreRun = &cobra.Command{
Expand All @@ -65,7 +66,7 @@ var tasksPreRun = &cobra.Command{
}
fmt.Println("Executing Pre-rollout Tasks")

taskIterator, err := iterateTaskGenerator(true, unidleThenRun, buildValues, true)
taskIterator, err := iterateTaskGenerator(true, unidleThenRun, buildValues, "PreRollout", true)
if err != nil {
fmt.Println("Pre-rollout Tasks Failed with the following error: ", err.Error())
os.Exit(1)
Expand Down Expand Up @@ -97,7 +98,7 @@ var tasksPostRun = &cobra.Command{

fmt.Println("Executing Post-rollout Tasks")

taskIterator, err := iterateTaskGenerator(false, runCleanTaskInEnvironment, buildValues, true)
taskIterator, err := iterateTaskGenerator(false, runCleanTaskInEnvironment, buildValues, "PostRollout", true)
if err != nil {
fmt.Println("Pre-rollout Tasks Failed with the following error: ", err.Error())
os.Exit(1)
Expand Down Expand Up @@ -159,7 +160,7 @@ type iterateTaskFuncType func(tasklib.TaskEnvironment, []lagoon.Task) (bool, err
// that lets the resulting function reference values as part of the closure, thereby cleaning up the definition a bit.
// so, the variables passed into the factor (eg. allowDeployMissingErrors, etc.) determine the way the function behaves,
// without needing to pass those into the call to the returned function itself.
func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnvironmentFuncType, buildValues generator.BuildValues, debug bool) (iterateTaskFuncType, error) {
func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnvironmentFuncType, buildValues generator.BuildValues, prePost string, debug bool) (iterateTaskFuncType, error) {
var retErr error
namespace := buildValues.Namespace
if namespace == "" {
Expand Down Expand Up @@ -188,7 +189,7 @@ func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnv
return true, err
}
if runTask {
err := taskRunner(namespace, task)
err := taskRunner(namespace, prePost, task)
if err != nil {
switch e := err.(type) {
case *lagoon.DeploymentMissingError:
Expand Down Expand Up @@ -242,12 +243,12 @@ func evaluateWhenConditionsForTaskInEnvironment(environment tasklib.TaskEnvironm
return retBool, nil
}

type runTaskInEnvironmentFuncType func(namespace string, incoming lagoon.Task) error
type runTaskInEnvironmentFuncType func(namespace string, prePost string, incoming lagoon.Task) error

// runCleanTaskInEnvironment implements runTaskInEnvironmentFuncType and will
// 1. make sure the task we pass to the execution environment is free of any data we don't want (hence the new task)
// 2. will actually execute the task in the environment.
func runCleanTaskInEnvironment(namespace string, incoming lagoon.Task) error {
func runCleanTaskInEnvironment(namespace string, prePost string, incoming lagoon.Task) error {
task := lagoon.NewTask()
task.Command = incoming.Command
task.Namespace = namespace
Expand All @@ -257,7 +258,7 @@ func runCleanTaskInEnvironment(namespace string, incoming lagoon.Task) error {
task.Name = incoming.Name
task.ScaleMaxIterations = incoming.ScaleMaxIterations
task.ScaleWaitTime = incoming.ScaleWaitTime
err := lagoon.ExecuteTaskInEnvironment(task)
err := lagoon.ExecuteTaskInEnvironment(task, prePost)
return err
}

Expand Down
15 changes: 10 additions & 5 deletions cmd/tasks_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,65 +166,70 @@ func Test_iterateTaskGenerator(t *testing.T) {
tests := []struct {
name string
debug bool
prePost string
args args
wantError bool
}{
{name: "Runs with no errors",
args: args{
allowDeployMissingErrors: true,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return nil
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PreRollout",
wantError: false,
},
{name: "Allows deploy missing errors and keeps rolling (pre rollout case)",
args: args{
allowDeployMissingErrors: true,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return &lagoon.DeploymentMissingError{}
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PreRollout",
wantError: false,
},
{name: "Does not allow deploy missing errors and stops with error (post rollout)",
args: args{
allowDeployMissingErrors: false,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return &lagoon.DeploymentMissingError{}
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PostRollout",
wantError: true,
},
{name: "Allows deploy missing errors but stops with any other error (pre rollout)",
args: args{
allowDeployMissingErrors: true,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return &lagoon.PodScalingError{}
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PostRollout",
wantError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := iterateTaskGenerator(tt.args.allowDeployMissingErrors, tt.args.taskRunner, tt.args.buildValues, tt.debug)
got, _ := iterateTaskGenerator(tt.args.allowDeployMissingErrors, tt.args.taskRunner, tt.args.buildValues, tt.prePost, tt.debug)
_, err := got(tasklib.TaskEnvironment{}, tt.args.tasks)

if tt.wantError && err == nil {
Expand Down
50 changes: 21 additions & 29 deletions internal/lagoon/tasks.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package lagoon

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -111,7 +110,7 @@ func getConfig() (*rest.Config, error) {
}

// ExecuteTaskInEnvironment .
func ExecuteTaskInEnvironment(task Task) error {
func ExecuteTaskInEnvironment(task Task, prePost string) error {
command := make([]string, 0, 5)
if task.Shell != "" {
command = append(command, task.Shell)
Expand All @@ -122,16 +121,14 @@ func ExecuteTaskInEnvironment(task Task) error {
command = append(command, "-c")
command = append(command, task.Command)

output, err := ExecTaskInPod(task, command, false) //(task.Service, task.Namespace, command, false, task.Container, task.ScaleWaitTime, task.ScaleMaxIterations)
fmt.Printf("##############################################\nBEGIN %s: %s\n##############################################\n", prePost, task.Name)

err := ExecTaskInPod(task, command, false) //(task.Service, task.Namespace, command, false, task.Container, task.ScaleWaitTime, task.ScaleMaxIterations)

if err != nil {
fmt.Printf("Failed to execute task `%v` due to reason `%v`\n", task.Name, err.Error())
}

if len(output) > 0 {
fmt.Printf("*** Task output ***\n %v \n *** output ends ***\n", output)
}

return err
}

Expand All @@ -140,16 +137,16 @@ func ExecTaskInPod(
task Task,
command []string,
tty bool,
) (string, error) {
) error {

restCfg, err := getConfig()
if err != nil {
return "", err
return err
}

clientset, err := GetK8sClient(restCfg)
if err != nil {
return "", fmt.Errorf("unable to create client: %v", err)
return fmt.Errorf("unable to create client: %v", err)
}

depClient := clientset.AppsV1().Deployments(task.Namespace)
Expand All @@ -160,11 +157,11 @@ func ExecTaskInPod(
LabelSelector: lagoonServiceLabel,
})
if err != nil {
return "", err
return err
}

if len(deployments.Items) == 0 {
return "", &DeploymentMissingError{ErrorText: "No deployments found matching label: " + lagoonServiceLabel}
return &DeploymentMissingError{ErrorText: "No deployments found matching label: " + lagoonServiceLabel}
}

deployment := &deployments.Items[0]
Expand All @@ -174,14 +171,14 @@ func ExecTaskInPod(
numIterations := 1
for ; !podReady; numIterations++ {
if numIterations >= task.ScaleMaxIterations { //break if there's some reason we can't scale the pod
return "", errors.New("Failed to scale pods for " + deployment.Name)
return errors.New("Failed to scale pods for " + deployment.Name)
}
if deployment.Status.ReadyReplicas == 0 {
fmt.Println(fmt.Sprintf("No ready replicas found, scaling up. Attempt %d/%d", numIterations, task.ScaleMaxIterations))

scale, err := clientset.AppsV1().Deployments(task.Namespace).GetScale(context.TODO(), deployment.Name, v1.GetOptions{})
if err != nil {
return "", err
return err
}

if scale.Spec.Replicas == 0 {
Expand All @@ -191,7 +188,7 @@ func ExecTaskInPod(
time.Sleep(time.Second * time.Duration(task.ScaleWaitTime))
deployment, err = depClient.Get(context.TODO(), deployment.Name, v1.GetOptions{})
if err != nil {
return "", err
return err
}
} else {
podReady = true
Expand All @@ -206,7 +203,7 @@ func ExecTaskInPod(
})

if err != nil {
return "", err
return err
}

var pod corev1.Pod
Expand All @@ -227,7 +224,7 @@ func ExecTaskInPod(
}
}
if !foundRunningPod {
return "", &PodScalingError{
return &PodScalingError{
ErrorText: "Unable to find running Pod for namespace: " + task.Namespace,
}
}
Expand All @@ -248,7 +245,7 @@ func ExecTaskInPod(
scheme := runtime.NewScheme()

if err := corev1.AddToScheme(scheme); err != nil {
return "", fmt.Errorf("error adding to scheme: %v", err)
return fmt.Errorf("error adding to scheme: %v", err)
}
if len(command) == 0 {
command = []string{"sh"}
Expand All @@ -265,24 +262,19 @@ func ExecTaskInPod(

exec, err := remotecommand.NewSPDYExecutor(restCfg, "POST", req.URL())
if err != nil {
return "", fmt.Errorf("error while creating Executor: %v", err)
return fmt.Errorf("error while creating Executor: %v", err)
}

var stdOut, stdErr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdOut,
Stderr: &stdErr,
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: tty,
})
buffers := []io.Reader{&stdOut, &stdErr}
var output bytes.Buffer
reader := io.MultiReader(buffers...)
output.ReadFrom(reader)
if err != nil {
return output.String(), fmt.Errorf("Error returned: %v", err)
return fmt.Errorf("Error returned: %v", err)
}

return output.String(), nil
return nil

}

Expand Down

0 comments on commit f6b5cd5

Please sign in to comment.