Skip to content

Commit

Permalink
Merge pull request #221 from uselagoon/combined-stream
Browse files Browse the repository at this point in the history
refactor: combined task output to single stream to retain order
  • Loading branch information
tobybellwood authored Oct 9, 2023
2 parents 71f21f8 + 03057e2 commit 8140f18
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 49 deletions.
26 changes: 13 additions & 13 deletions cmd/tasks_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/spf13/cobra"
"github.com/uselagoon/build-deploy-tool/internal/generator"
"github.com/uselagoon/build-deploy-tool/internal/lagoon"
"github.com/uselagoon/build-deploy-tool/internal/tasklib"
"io/ioutil"
"os"
"strings"
)

var runPreRollout, runPostRollout, outOfClusterConfig bool
Expand All @@ -31,7 +31,7 @@ var taskCmd = &cobra.Command{
// unidleThenRun is a wrapper around 'runCleanTaskInEnvironment' used for pre-rollout tasks
// We actually want to unidle the namespace before running pre-rollout tasks,
// so we wrap the usual task runner before calling it.
func unidleThenRun(namespace string, incoming lagoon.Task) error {
func unidleThenRun(namespace string, prePost string, incoming lagoon.Task) error {
fmt.Printf("Unidling namespace with RequiresEnvironment: %v, ScaleMaxIterations:%v and ScaleWaitTime:%v\n", incoming.RequiresEnvironment, incoming.ScaleMaxIterations, incoming.ScaleWaitTime)
err := lagoon.UnidleNamespace(context.TODO(), namespace, incoming.ScaleMaxIterations, incoming.ScaleWaitTime)
if err != nil {
Expand All @@ -47,7 +47,7 @@ func unidleThenRun(namespace string, incoming lagoon.Task) error {
return fmt.Errorf("There was a problem when unidling the environment for pre-rollout tasks: %v", err.Error())
}
}
return runCleanTaskInEnvironment(namespace, incoming)
return runCleanTaskInEnvironment(namespace, prePost, incoming)
}

var tasksPreRun = &cobra.Command{
Expand All @@ -65,7 +65,7 @@ var tasksPreRun = &cobra.Command{
}
fmt.Println("Executing Pre-rollout Tasks")

taskIterator, err := iterateTaskGenerator(true, unidleThenRun, buildValues, true)
taskIterator, err := iterateTaskGenerator(true, unidleThenRun, buildValues, "Pre-Rollout", true)
if err != nil {
fmt.Println("Pre-rollout Tasks Failed with the following error: ", err.Error())
os.Exit(1)
Expand Down Expand Up @@ -97,7 +97,7 @@ var tasksPostRun = &cobra.Command{

fmt.Println("Executing Post-rollout Tasks")

taskIterator, err := iterateTaskGenerator(false, runCleanTaskInEnvironment, buildValues, true)
taskIterator, err := iterateTaskGenerator(false, runCleanTaskInEnvironment, buildValues, "Post-Rollout", true)
if err != nil {
fmt.Println("Pre-rollout Tasks Failed with the following error: ", err.Error())
os.Exit(1)
Expand Down Expand Up @@ -159,7 +159,7 @@ type iterateTaskFuncType func(tasklib.TaskEnvironment, []lagoon.Task) (bool, err
// that lets the resulting function reference values as part of the closure, thereby cleaning up the definition a bit.
// so, the variables passed into the factor (eg. allowDeployMissingErrors, etc.) determine the way the function behaves,
// without needing to pass those into the call to the returned function itself.
func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnvironmentFuncType, buildValues generator.BuildValues, debug bool) (iterateTaskFuncType, error) {
func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnvironmentFuncType, buildValues generator.BuildValues, prePost string, debug bool) (iterateTaskFuncType, error) {
var retErr error
namespace := buildValues.Namespace
if namespace == "" {
Expand All @@ -168,7 +168,7 @@ func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnv
if _, err := os.Stat(filename); errors.Is(err, os.ErrNotExist) {
retErr = fmt.Errorf("A target namespace is required to run pre/post-rollout tasks")
}
nsb, err := ioutil.ReadFile(filename)
nsb, err := os.ReadFile(filename)
if err != nil {
retErr = err
}
Expand All @@ -188,7 +188,7 @@ func iterateTaskGenerator(allowDeployMissingErrors bool, taskRunner runTaskInEnv
return true, err
}
if runTask {
err := taskRunner(namespace, task)
err := taskRunner(namespace, prePost, task)
if err != nil {
switch e := err.(type) {
case *lagoon.DeploymentMissingError:
Expand Down Expand Up @@ -242,12 +242,12 @@ func evaluateWhenConditionsForTaskInEnvironment(environment tasklib.TaskEnvironm
return retBool, nil
}

type runTaskInEnvironmentFuncType func(namespace string, incoming lagoon.Task) error
type runTaskInEnvironmentFuncType func(namespace string, prePost string, incoming lagoon.Task) error

// runCleanTaskInEnvironment implements runTaskInEnvironmentFuncType and will
// 1. make sure the task we pass to the execution environment is free of any data we don't want (hence the new task)
// 2. will actually execute the task in the environment.
func runCleanTaskInEnvironment(namespace string, incoming lagoon.Task) error {
func runCleanTaskInEnvironment(namespace string, prePost string, incoming lagoon.Task) error {
task := lagoon.NewTask()
task.Command = incoming.Command
task.Namespace = namespace
Expand All @@ -257,7 +257,7 @@ func runCleanTaskInEnvironment(namespace string, incoming lagoon.Task) error {
task.Name = incoming.Name
task.ScaleMaxIterations = incoming.ScaleMaxIterations
task.ScaleWaitTime = incoming.ScaleWaitTime
err := lagoon.ExecuteTaskInEnvironment(task)
err := lagoon.ExecuteTaskInEnvironment(task, prePost)
return err
}

Expand Down
15 changes: 10 additions & 5 deletions cmd/tasks_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,65 +166,70 @@ func Test_iterateTaskGenerator(t *testing.T) {
tests := []struct {
name string
debug bool
prePost string
args args
wantError bool
}{
{name: "Runs with no errors",
args: args{
allowDeployMissingErrors: true,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return nil
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PreRollout",
wantError: false,
},
{name: "Allows deploy missing errors and keeps rolling (pre rollout case)",
args: args{
allowDeployMissingErrors: true,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return &lagoon.DeploymentMissingError{}
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PreRollout",
wantError: false,
},
{name: "Does not allow deploy missing errors and stops with error (post rollout)",
args: args{
allowDeployMissingErrors: false,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return &lagoon.DeploymentMissingError{}
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PostRollout",
wantError: true,
},
{name: "Allows deploy missing errors but stops with any other error (pre rollout)",
args: args{
allowDeployMissingErrors: true,
taskRunner: func(namespace string, incoming lagoon.Task) error {
taskRunner: func(namespace string, prePost string, incoming lagoon.Task) error {
return &lagoon.PodScalingError{}
},
tasks: []lagoon.Task{
{},
},
buildValues: generator.BuildValues{Namespace: "empty"},
},
prePost: "PostRollout",
wantError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := iterateTaskGenerator(tt.args.allowDeployMissingErrors, tt.args.taskRunner, tt.args.buildValues, tt.debug)
got, _ := iterateTaskGenerator(tt.args.allowDeployMissingErrors, tt.args.taskRunner, tt.args.buildValues, tt.prePost, tt.debug)
_, err := got(tasklib.TaskEnvironment{}, tt.args.tasks)

if tt.wantError && err == nil {
Expand Down
57 changes: 28 additions & 29 deletions internal/lagoon/tasks.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package lagoon

import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -85,9 +84,9 @@ func getConfig() (*rest.Config, error) {
if *kubeconfig == "" {
//Fall back on out of cluster
// read the deployer token.
token, err := ioutil.ReadFile("/var/run/secrets/lagoon/deployer/token")
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
token, err = ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
token, err = os.ReadFile("/var/run/secrets/lagoon/deployer/token")
if err != nil {
return nil, err
}
Expand All @@ -110,7 +109,7 @@ func getConfig() (*rest.Config, error) {
}

// ExecuteTaskInEnvironment .
func ExecuteTaskInEnvironment(task Task) error {
func ExecuteTaskInEnvironment(task Task, prePost string) error {
command := make([]string, 0, 5)
if task.Shell != "" {
command = append(command, task.Shell)
Expand All @@ -121,18 +120,19 @@ func ExecuteTaskInEnvironment(task Task) error {
command = append(command, "-c")
command = append(command, task.Command)

stdout, stderr, err := ExecTaskInPod(task, command, false) //(task.Service, task.Namespace, command, false, task.Container, task.ScaleWaitTime, task.ScaleMaxIterations)
fmt.Printf("##############################################\nBEGIN %s %s\n##############################################\n", prePost, task.Name)
st := time.Now()

err := ExecTaskInPod(task, command, false) //(task.Service, task.Namespace, command, false, task.Container, task.ScaleWaitTime, task.ScaleMaxIterations)

if err != nil {
fmt.Printf("Failed to execute task `%v` due to reason `%v`\n", task.Name, err.Error())
}

if len(stdout) > 0 {
fmt.Printf("*** Task STDOUT ***\n %v \n *** STDOUT Ends ***\n", stdout)
}
if len(stderr) > 0 {
fmt.Printf("*** Task STDERR ***\n %v \n *** STDERR Ends ***\n", stderr)
}
et := time.Now()
diff := time.Time{}.Add(et.Sub(st))
tz, _ := et.Zone()
fmt.Printf("##############################################\nSTEP %s %s: Completed at %s (%s) Duration %s Elapsed %s\n##############################################\n", prePost, task.Name, et.Format("2006-01-02 15:04:05"), tz, diff.Format("15:04:05"), diff.Format("15:04:05"))

return err
}
Expand All @@ -142,16 +142,16 @@ func ExecTaskInPod(
task Task,
command []string,
tty bool,
) (string, string, error) {
) error {

restCfg, err := getConfig()
if err != nil {
return "", "", err
return err
}

clientset, err := GetK8sClient(restCfg)
if err != nil {
return "", "", fmt.Errorf("unable to create client: %v", err)
return fmt.Errorf("unable to create client: %v", err)
}

depClient := clientset.AppsV1().Deployments(task.Namespace)
Expand All @@ -162,11 +162,11 @@ func ExecTaskInPod(
LabelSelector: lagoonServiceLabel,
})
if err != nil {
return "", "", err
return err
}

if len(deployments.Items) == 0 {
return "", "", &DeploymentMissingError{ErrorText: "No deployments found matching label: " + lagoonServiceLabel}
return &DeploymentMissingError{ErrorText: "No deployments found matching label: " + lagoonServiceLabel}
}

deployment := &deployments.Items[0]
Expand All @@ -176,14 +176,14 @@ func ExecTaskInPod(
numIterations := 1
for ; !podReady; numIterations++ {
if numIterations >= task.ScaleMaxIterations { //break if there's some reason we can't scale the pod
return "", "", errors.New("Failed to scale pods for " + deployment.Name)
return errors.New("Failed to scale pods for " + deployment.Name)
}
if deployment.Status.ReadyReplicas == 0 {
fmt.Println(fmt.Sprintf("No ready replicas found, scaling up. Attempt %d/%d", numIterations, task.ScaleMaxIterations))

scale, err := clientset.AppsV1().Deployments(task.Namespace).GetScale(context.TODO(), deployment.Name, v1.GetOptions{})
if err != nil {
return "", "", err
return err
}

if scale.Spec.Replicas == 0 {
Expand All @@ -193,7 +193,7 @@ func ExecTaskInPod(
time.Sleep(time.Second * time.Duration(task.ScaleWaitTime))
deployment, err = depClient.Get(context.TODO(), deployment.Name, v1.GetOptions{})
if err != nil {
return "", "", err
return err
}
} else {
podReady = true
Expand All @@ -208,7 +208,7 @@ func ExecTaskInPod(
})

if err != nil {
return "", "", err
return err
}

var pod corev1.Pod
Expand All @@ -229,7 +229,7 @@ func ExecTaskInPod(
}
}
if !foundRunningPod {
return "", "", &PodScalingError{
return &PodScalingError{
ErrorText: "Unable to find running Pod for namespace: " + task.Namespace,
}
}
Expand All @@ -250,7 +250,7 @@ func ExecTaskInPod(
scheme := runtime.NewScheme()

if err := corev1.AddToScheme(scheme); err != nil {
return "", "", fmt.Errorf("error adding to scheme: %v", err)
return fmt.Errorf("error adding to scheme: %v", err)
}
if len(command) == 0 {
command = []string{"sh"}
Expand All @@ -267,20 +267,19 @@ func ExecTaskInPod(

exec, err := remotecommand.NewSPDYExecutor(restCfg, "POST", req.URL())
if err != nil {
return "", "", fmt.Errorf("error while creating Executor: %v", err)
return fmt.Errorf("error while creating Executor: %v", err)
}

var stdout, stderr bytes.Buffer
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: tty,
})
if err != nil {
return stdout.String(), stderr.String(), fmt.Errorf("Error returned: %v", err)
return fmt.Errorf("Error returned: %v", err)
}

return stdout.String(), stderr.String(), nil
return nil

}

Expand Down
10 changes: 8 additions & 2 deletions legacy/build-deploy-docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -806,11 +806,14 @@ if [ "${LAGOON_PREROLLOUT_DISABLED}" != "true" ]; then
build-deploy-tool tasks pre-rollout
else
echo "pre-rollout tasks are currently disabled LAGOON_PREROLLOUT_DISABLED is set to true"
set +x
currentStepEnd="$(date +"%Y-%m-%d %H:%M:%S")"
patchBuildStep "${buildStartTime}" "${previousStepEnd}" "${currentStepEnd}" "${NAMESPACE}" "preRolloutsCompleted" "Pre-Rollout Tasks"
set -x
fi

set +x
currentStepEnd="$(date +"%Y-%m-%d %H:%M:%S")"
patchBuildStep "${buildStartTime}" "${previousStepEnd}" "${currentStepEnd}" "${NAMESPACE}" "preRolloutsCompleted" "Pre-Rollout Tasks"
previousStepEnd=${currentStepEnd}
beginBuildStep "Service Configuration Phase 1" "serviceConfigurationPhase1"
set -x
Expand Down Expand Up @@ -1638,11 +1641,14 @@ if [ "${LAGOON_POSTROLLOUT_DISABLED}" != "true" ]; then
build-deploy-tool tasks post-rollout
else
echo "post-rollout tasks are currently disabled LAGOON_POSTROLLOUT_DISABLED is set to true"
set +x
currentStepEnd="$(date +"%Y-%m-%d %H:%M:%S")"
patchBuildStep "${buildStartTime}" "${previousStepEnd}" "${currentStepEnd}" "${NAMESPACE}" "postRolloutsCompleted" "Post-Rollout Tasks"
set -x
fi

set +x
currentStepEnd="$(date +"%Y-%m-%d %H:%M:%S")"
patchBuildStep "${buildStartTime}" "${previousStepEnd}" "${currentStepEnd}" "${NAMESPACE}" "postRolloutsCompleted" "Post-Rollout Tasks"
previousStepEnd=${currentStepEnd}
beginBuildStep "Build and Deploy" "finalizingBuild"
set -x
Expand Down

0 comments on commit 8140f18

Please sign in to comment.