From f1317d037a8250820501f5179e3ff89163e70342 Mon Sep 17 00:00:00 2001 From: shreddedbacon Date: Fri, 5 Jan 2024 20:32:18 +1100 Subject: [PATCH] chore: remove pendingmessage functionality --- apis/lagoon/v1beta1/lagoonbuild_types.go | 5 +- apis/lagoon/v1beta1/lagoonmessaging_types.go | 9 - apis/lagoon/v1beta1/lagoontask_types.go | 5 +- apis/lagoon/v1beta1/zz_generated.deepcopy.go | 45 --- .../crd/bases/crd.lagoon.sh_lagoonbuilds.yaml | 371 ------------------ .../crd/bases/crd.lagoon.sh_lagoontasks.yaml | 371 ------------------ controllers/v1beta1/build_deletionhandlers.go | 114 +----- .../v1beta1/podmonitor_buildhandlers.go | 48 +-- .../v1beta1/podmonitor_taskhandlers.go | 48 +-- internal/messenger/pending_messages.go | 151 ------- main.go | 12 +- 11 files changed, 42 insertions(+), 1137 deletions(-) delete mode 100644 internal/messenger/pending_messages.go diff --git a/apis/lagoon/v1beta1/lagoonbuild_types.go b/apis/lagoon/v1beta1/lagoonbuild_types.go index c901ef22..16f0c706 100644 --- a/apis/lagoon/v1beta1/lagoonbuild_types.go +++ b/apis/lagoon/v1beta1/lagoonbuild_types.go @@ -90,9 +90,8 @@ type LagoonBuild struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec LagoonBuildSpec `json:"spec,omitempty"` - Status LagoonBuildStatus `json:"status,omitempty"` - StatusMessages *LagoonStatusMessages `json:"statusMessages,omitempty"` + Spec LagoonBuildSpec `json:"spec,omitempty"` + Status LagoonBuildStatus `json:"status,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/lagoon/v1beta1/lagoonmessaging_types.go b/apis/lagoon/v1beta1/lagoonmessaging_types.go index f9fe6ecd..fe1fb2e0 100644 --- a/apis/lagoon/v1beta1/lagoonmessaging_types.go +++ b/apis/lagoon/v1beta1/lagoonmessaging_types.go @@ -47,13 +47,4 @@ type LagoonMessage struct { Type string `json:"type,omitempty"` Namespace string `json:"namespace,omitempty"` Meta *LagoonLogMeta `json:"meta,omitempty"` - // BuildInfo *LagoonBuildInfo `json:"buildInfo,omitempty"` -} - -// LagoonStatusMessages is where unsent messages are stored for re-sending. -type LagoonStatusMessages struct { - StatusMessage *LagoonLog `json:"statusMessage,omitempty"` - BuildLogMessage *LagoonLog `json:"buildLogMessage,omitempty"` - TaskLogMessage *LagoonLog `json:"taskLogMessage,omitempty"` - EnvironmentMessage *LagoonMessage `json:"environmentMessage,omitempty"` } diff --git a/apis/lagoon/v1beta1/lagoontask_types.go b/apis/lagoon/v1beta1/lagoontask_types.go index 3231e530..aaffcff4 100644 --- a/apis/lagoon/v1beta1/lagoontask_types.go +++ b/apis/lagoon/v1beta1/lagoontask_types.go @@ -158,9 +158,8 @@ type LagoonTask struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec LagoonTaskSpec `json:"spec,omitempty"` - Status LagoonTaskStatus `json:"status,omitempty"` - StatusMessages *LagoonStatusMessages `json:"statusMessages,omitempty"` + Spec LagoonTaskSpec `json:"spec,omitempty"` + Status LagoonTaskStatus `json:"status,omitempty"` } // +kubebuilder:object:root=true diff --git a/apis/lagoon/v1beta1/zz_generated.deepcopy.go b/apis/lagoon/v1beta1/zz_generated.deepcopy.go index 842af614..45931b74 100644 --- a/apis/lagoon/v1beta1/zz_generated.deepcopy.go +++ b/apis/lagoon/v1beta1/zz_generated.deepcopy.go @@ -81,11 +81,6 @@ func (in *LagoonBuild) DeepCopyInto(out *LagoonBuild) { in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) - if in.StatusMessages != nil { - in, out := &in.StatusMessages, &out.StatusMessages - *out = new(LagoonStatusMessages) - (*in).DeepCopyInto(*out) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LagoonBuild. @@ -318,41 +313,6 @@ func (in *LagoonMiscInfo) DeepCopy() *LagoonMiscInfo { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *LagoonStatusMessages) DeepCopyInto(out *LagoonStatusMessages) { - *out = *in - if in.StatusMessage != nil { - in, out := &in.StatusMessage, &out.StatusMessage - *out = new(LagoonLog) - (*in).DeepCopyInto(*out) - } - if in.BuildLogMessage != nil { - in, out := &in.BuildLogMessage, &out.BuildLogMessage - *out = new(LagoonLog) - (*in).DeepCopyInto(*out) - } - if in.TaskLogMessage != nil { - in, out := &in.TaskLogMessage, &out.TaskLogMessage - *out = new(LagoonLog) - (*in).DeepCopyInto(*out) - } - if in.EnvironmentMessage != nil { - in, out := &in.EnvironmentMessage, &out.EnvironmentMessage - *out = new(LagoonMessage) - (*in).DeepCopyInto(*out) - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LagoonStatusMessages. -func (in *LagoonStatusMessages) DeepCopy() *LagoonStatusMessages { - if in == nil { - return nil - } - out := new(LagoonStatusMessages) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LagoonTask) DeepCopyInto(out *LagoonTask) { *out = *in @@ -360,11 +320,6 @@ func (in *LagoonTask) DeepCopyInto(out *LagoonTask) { in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) - if in.StatusMessages != nil { - in, out := &in.StatusMessages, &out.StatusMessages - *out = new(LagoonStatusMessages) - (*in).DeepCopyInto(*out) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LagoonTask. diff --git a/config/crd/bases/crd.lagoon.sh_lagoonbuilds.yaml b/config/crd/bases/crd.lagoon.sh_lagoonbuilds.yaml index bbc2dc2d..c8a2c299 100644 --- a/config/crd/bases/crd.lagoon.sh_lagoonbuilds.yaml +++ b/config/crd/bases/crd.lagoon.sh_lagoonbuilds.yaml @@ -201,377 +201,6 @@ spec: format: byte type: string type: object - statusMessages: - description: LagoonStatusMessages is where unsent messages are stored - for re-sending. - properties: - buildLogMessage: - description: LagoonLog is used to sendToLagoonLogs messaging queue - this is general logging information - properties: - event: - type: string - message: - type: string - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - project: - type: string - severity: - type: string - uuid: - type: string - type: object - environmentMessage: - description: LagoonMessage is used for sending build info back to - Lagoon messaging queue to update the environment or deployment - properties: - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - namespace: - type: string - type: - type: string - type: object - statusMessage: - description: LagoonLog is used to sendToLagoonLogs messaging queue - this is general logging information - properties: - event: - type: string - message: - type: string - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - project: - type: string - severity: - type: string - uuid: - type: string - type: object - taskLogMessage: - description: LagoonLog is used to sendToLagoonLogs messaging queue - this is general logging information - properties: - event: - type: string - message: - type: string - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - project: - type: string - severity: - type: string - uuid: - type: string - type: object - type: object type: object served: true storage: true diff --git a/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml b/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml index f1394f91..d17f6265 100644 --- a/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml +++ b/config/crd/bases/crd.lagoon.sh_lagoontasks.yaml @@ -183,377 +183,6 @@ spec: format: byte type: string type: object - statusMessages: - description: LagoonStatusMessages is where unsent messages are stored - for re-sending. - properties: - buildLogMessage: - description: LagoonLog is used to sendToLagoonLogs messaging queue - this is general logging information - properties: - event: - type: string - message: - type: string - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - project: - type: string - severity: - type: string - uuid: - type: string - type: object - environmentMessage: - description: LagoonMessage is used for sending build info back to - Lagoon messaging queue to update the environment or deployment - properties: - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - namespace: - type: string - type: - type: string - type: object - statusMessage: - description: LagoonLog is used to sendToLagoonLogs messaging queue - this is general logging information - properties: - event: - type: string - message: - type: string - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - project: - type: string - severity: - type: string - uuid: - type: string - type: object - taskLogMessage: - description: LagoonLog is used to sendToLagoonLogs messaging queue - this is general logging information - properties: - event: - type: string - message: - type: string - meta: - description: LagoonLogMeta is the metadata that is used by logging - in Lagoon. - properties: - advancedData: - type: string - branchName: - type: string - buildName: - type: string - buildPhase: - type: string - buildStatus: - type: string - buildStep: - type: string - clusterName: - type: string - endTime: - type: string - environment: - type: string - environmentId: - type: integer - jobName: - type: string - jobStatus: - type: string - jobStep: - type: string - key: - type: string - logLink: - type: string - project: - type: string - projectId: - type: integer - projectName: - type: string - remoteId: - type: string - route: - type: string - routes: - items: - type: string - type: array - services: - items: - type: string - type: array - startTime: - type: string - task: - description: LagoonTaskInfo defines what a task can use to - communicate with Lagoon via SSH/API. - properties: - apiHost: - type: string - command: - type: string - id: - type: string - name: - type: string - service: - type: string - sshHost: - type: string - sshPort: - type: string - taskName: - type: string - required: - - id - type: object - type: object - project: - type: string - severity: - type: string - uuid: - type: string - type: object - type: object type: object served: true storage: true diff --git a/controllers/v1beta1/build_deletionhandlers.go b/controllers/v1beta1/build_deletionhandlers.go index fba4c0e2..03c773ad 100644 --- a/controllers/v1beta1/build_deletionhandlers.go +++ b/controllers/v1beta1/build_deletionhandlers.go @@ -260,15 +260,9 @@ func (r *LagoonBuildReconciler) buildLogsToLagoonLogs(ctx context.Context, // bother to patch the resource?? // leave it for now cause the resource will just be deleted anyway if err := r.Messaging.Publish("lagoon-logs", msgBytes); err != nil { - // if we can't publish the message, set it as a pending message - // overwrite whatever is there as these are just current state messages so it doesn't - // really matter if we don't smootly transition in what we send back to lagoon - r.updateBuildLogMessage(ctx, lagoonBuild, msg) + // if we can't publish the message, just return return } - // if we are able to publish the message, then we need to remove any pending messages from the resource - // and make sure we don't try and publish again - r.removeBuildPendingMessageStatus(ctx, lagoonBuild) } } @@ -351,15 +345,9 @@ func (r *LagoonBuildReconciler) updateDeploymentAndEnvironmentTask(ctx context.C // bother to patch the resource?? // leave it for now cause the resource will just be deleted anyway if err := r.Messaging.Publish("lagoon-tasks:controller", msgBytes); err != nil { - // if we can't publish the message, set it as a pending message - // overwrite whatever is there as these are just current state messages so it doesn't - // really matter if we don't smootly transition in what we send back to lagoon - r.updateEnvironmentMessage(ctx, lagoonBuild, msg) + // if we can't publish the message, just return return } - // if we are able to publish the message, then we need to remove any pending messages from the resource - // and make sure we don't try and publish again - r.removeBuildPendingMessageStatus(ctx, lagoonBuild) } } @@ -411,104 +399,8 @@ func (r *LagoonBuildReconciler) buildStatusLogsToLagoonLogs(ctx context.Context, // bother to patch the resource?? // leave it for now cause the resource will just be deleted anyway if err := r.Messaging.Publish("lagoon-logs", msgBytes); err != nil { - // if we can't publish the message, set it as a pending message - // overwrite whatever is there as these are just current state messages so it doesn't - // really matter if we don't smootly transition in what we send back to lagoon - r.updateBuildStatusMessage(ctx, lagoonBuild, msg) + // if we can't publish the message, just return return } - // if we are able to publish the message, then we need to remove any pending messages from the resource - // and make sure we don't try and publish again - r.removeBuildPendingMessageStatus(ctx, lagoonBuild) } } - -// updateEnvironmentMessage this is called if the message queue is unavailable, it stores the message that would be sent in the lagoon build -func (r *LagoonBuildReconciler) updateEnvironmentMessage(ctx context.Context, - lagoonBuild *lagoonv1beta1.LagoonBuild, - envMessage lagoonv1beta1.LagoonMessage, -) error { - // set the transition time - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "true", - }, - }, - "statusMessages": map[string]interface{}{ - "environmentMessage": envMessage, - }, - }) - if err := r.Patch(ctx, lagoonBuild, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - return fmt.Errorf("Unable to update status condition: %v", err) - } - return nil -} - -// updateBuildStatusMessage this is called if the message queue is unavailable, it stores the message that would be sent in the lagoon build -func (r *LagoonBuildReconciler) updateBuildStatusMessage(ctx context.Context, - lagoonBuild *lagoonv1beta1.LagoonBuild, - statusMessage lagoonv1beta1.LagoonLog, -) error { - // set the transition time - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "true", - }, - }, - "statusMessages": map[string]interface{}{ - "statusMessage": statusMessage, - }, - }) - if err := r.Patch(ctx, lagoonBuild, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - return fmt.Errorf("Unable to update status condition: %v", err) - } - return nil -} - -// removeBuildPendingMessageStatus purges the status messages from the resource once they are successfully re-sent -func (r *LagoonBuildReconciler) removeBuildPendingMessageStatus(ctx context.Context, - lagoonBuild *lagoonv1beta1.LagoonBuild, -) error { - // if we have the pending messages label as true, then we want to remove this label and any pending statusmessages - // so we can avoid double handling, or an old pending message from being sent after a new pending message - if val, ok := lagoonBuild.ObjectMeta.Labels["lagoon.sh/pendingMessages"]; !ok { - if val == "true" { - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "false", - }, - }, - "statusMessages": nil, - }) - if err := r.Patch(ctx, lagoonBuild, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - return fmt.Errorf("Unable to update status condition: %v", err) - } - } - } - return nil -} - -// updateBuildLogMessage this is called if the message queue is unavailable, it stores the message that would be sent in the lagoon build -func (r *LagoonBuildReconciler) updateBuildLogMessage(ctx context.Context, - lagoonBuild *lagoonv1beta1.LagoonBuild, - buildMessage lagoonv1beta1.LagoonLog, -) error { - // set the transition time - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "true", - }, - }, - "statusMessages": map[string]interface{}{ - "buildLogMessage": buildMessage, - }, - }) - if err := r.Patch(ctx, lagoonBuild, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - return fmt.Errorf("Unable to update status condition: %v", err) - } - return nil -} diff --git a/controllers/v1beta1/podmonitor_buildhandlers.go b/controllers/v1beta1/podmonitor_buildhandlers.go index 0159cb65..d41de695 100644 --- a/controllers/v1beta1/podmonitor_buildhandlers.go +++ b/controllers/v1beta1/podmonitor_buildhandlers.go @@ -143,7 +143,7 @@ func (r *LagoonMonitorReconciler) buildLogsToLagoonLogs(ctx context.Context, namespace *corev1.Namespace, condition string, logs []byte, -) (bool, lagoonv1beta1.LagoonLog) { +) error { if r.EnableMQ { buildStep := "running" if condition == "failed" || condition == "complete" || condition == "cancelled" { @@ -208,7 +208,7 @@ Logs on pod %s, assigned to cluster %s // overwrite whatever is there as these are just current state messages so it doesn't // really matter if we don't smootly transition in what we send back to lagoon // r.updateBuildLogMessage(ctx, lagoonBuild, msg) - return true, msg + return err } if r.EnableDebug { opLog.Info( @@ -222,7 +222,7 @@ Logs on pod %s, assigned to cluster %s // if we are able to publish the message, then we need to remove any pending messages from the resource // and make sure we don't try and publish again } - return false, lagoonv1beta1.LagoonLog{} + return nil } // updateDeploymentAndEnvironmentTask sends the status of the build and deployment to the controllerhandler message queue in lagoon, @@ -234,7 +234,7 @@ func (r *LagoonMonitorReconciler) updateDeploymentAndEnvironmentTask(ctx context lagoonEnv *corev1.ConfigMap, namespace *corev1.Namespace, condition string, -) (bool, lagoonv1beta1.LagoonMessage) { +) error { if r.EnableMQ { buildStep := "running" if condition == "failed" || condition == "complete" || condition == "cancelled" { @@ -346,7 +346,7 @@ func (r *LagoonMonitorReconciler) updateDeploymentAndEnvironmentTask(ctx context // if we can't publish the message, set it as a pending message // overwrite whatever is there as these are just current state messages so it doesn't // really matter if we don't smootly transition in what we send back to lagoon - return true, msg + return err } if r.EnableDebug { opLog.Info( @@ -359,7 +359,7 @@ func (r *LagoonMonitorReconciler) updateDeploymentAndEnvironmentTask(ctx context // if we are able to publish the message, then we need to remove any pending messages from the resource // and make sure we don't try and publish again } - return false, lagoonv1beta1.LagoonMessage{} + return nil } // buildStatusLogsToLagoonLogs sends the logs to lagoon-logs message queue, used for general messaging @@ -370,7 +370,7 @@ func (r *LagoonMonitorReconciler) buildStatusLogsToLagoonLogs(ctx context.Contex lagoonEnv *corev1.ConfigMap, namespace *corev1.Namespace, condition string, -) (bool, lagoonv1beta1.LagoonLog) { +) error { if r.EnableMQ { buildStep := "running" if condition == "failed" || condition == "complete" || condition == "cancelled" { @@ -441,7 +441,7 @@ func (r *LagoonMonitorReconciler) buildStatusLogsToLagoonLogs(ctx context.Contex // if we can't publish the message, set it as a pending message // overwrite whatever is there as these are just current state messages so it doesn't // really matter if we don't smootly transition in what we send back to lagoon - return true, msg + return err } if r.EnableDebug { opLog.Info( @@ -455,7 +455,7 @@ func (r *LagoonMonitorReconciler) buildStatusLogsToLagoonLogs(ctx context.Contex // if we are able to publish the message, then we need to remove any pending messages from the resource // and make sure we don't try and publish again } - return false, lagoonv1beta1.LagoonLog{} + return nil } // updateDeploymentWithLogs collects logs from the build containers and ships or stores them @@ -542,7 +542,6 @@ Build %s "lagoon.sh/buildStarted": "true", }, }, - "statusMessages": map[string]interface{}{}, } condition := lagoonv1beta1.LagoonBuildConditions{ @@ -574,31 +573,18 @@ Build %s } // do any message publishing here, and update any pending messages if needed - pendingStatus, pendingStatusMessage := r.buildStatusLogsToLagoonLogs(ctx, opLog, &lagoonBuild, &jobPod, &lagoonEnv, namespace, buildCondition.ToLower()) - pendingEnvironment, pendingEnvironmentMessage := r.updateDeploymentAndEnvironmentTask(ctx, opLog, &lagoonBuild, &jobPod, &lagoonEnv, namespace, buildCondition.ToLower()) - var pendingBuildLog bool - var pendingBuildLogMessage lagoonv1beta1.LagoonLog + if err = r.buildStatusLogsToLagoonLogs(ctx, opLog, &lagoonBuild, &jobPod, &lagoonEnv, namespace, buildCondition.ToLower()); err != nil { + opLog.Error(err, fmt.Sprintf("Unable to publish build status logs")) + } + if err = r.updateDeploymentAndEnvironmentTask(ctx, opLog, &lagoonBuild, &jobPod, &lagoonEnv, namespace, buildCondition.ToLower()); err != nil { + opLog.Error(err, fmt.Sprintf("Unable to publish build update")) + } // if the container logs can't be retrieved, we don't want to send any build logs back, as this will nuke // any previously received logs if !strings.Contains(string(allContainerLogs), "unable to retrieve container logs for containerd") { - pendingBuildLog, pendingBuildLogMessage = r.buildLogsToLagoonLogs(ctx, opLog, &lagoonBuild, &jobPod, namespace, buildCondition.ToLower(), allContainerLogs) - } - if pendingStatus || pendingEnvironment || pendingBuildLog { - mergeMap["metadata"].(map[string]interface{})["labels"].(map[string]interface{})["lagoon.sh/pendingMessages"] = "true" - if pendingStatus { - mergeMap["statusMessages"].(map[string]interface{})["statusMessage"] = pendingStatusMessage + if err = r.buildLogsToLagoonLogs(ctx, opLog, &lagoonBuild, &jobPod, namespace, buildCondition.ToLower(), allContainerLogs); err != nil { + opLog.Error(err, fmt.Sprintf("Unable to publish build logs")) } - if pendingEnvironment { - mergeMap["statusMessages"].(map[string]interface{})["environmentMessage"] = pendingEnvironmentMessage - } - // if the build log message is too long, don't save it - if pendingBuildLog && len(pendingBuildLogMessage.Message) > 1048576 { - mergeMap["statusMessages"].(map[string]interface{})["buildLogMessage"] = pendingBuildLogMessage - } - } - if !pendingStatus && !pendingEnvironment && !pendingBuildLog { - mergeMap["metadata"].(map[string]interface{})["labels"].(map[string]interface{})["lagoon.sh/pendingMessages"] = nil - mergeMap["statusMessages"] = nil } mergePatch, _ := json.Marshal(mergeMap) // check if the build exists diff --git a/controllers/v1beta1/podmonitor_taskhandlers.go b/controllers/v1beta1/podmonitor_taskhandlers.go index 5ffe9305..01960323 100644 --- a/controllers/v1beta1/podmonitor_taskhandlers.go +++ b/controllers/v1beta1/podmonitor_taskhandlers.go @@ -112,7 +112,7 @@ func (r *LagoonMonitorReconciler) taskLogsToLagoonLogs(opLog logr.Logger, jobPod *corev1.Pod, condition string, logs []byte, -) (bool, lagoonv1beta1.LagoonLog) { +) error { if r.EnableMQ && lagoonTask != nil { msg := lagoonv1beta1.LagoonLog{ Severity: "info", @@ -147,12 +147,12 @@ Logs on pod %s, assigned to cluster %s // if we can't publish the message, set it as a pending message // overwrite whatever is there as these are just current state messages so it doesn't // really matter if we don't smootly transition in what we send back to lagoon - return true, msg + return err } // if we are able to publish the message, then we need to remove any pending messages from the resource // and make sure we don't try and publish again } - return false, lagoonv1beta1.LagoonLog{} + return nil } // updateLagoonTask sends the status of the task and deployment to the controllerhandler message queue in lagoon, @@ -161,7 +161,7 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(ctx context.Context, opLog lo lagoonTask *lagoonv1beta1.LagoonTask, jobPod *corev1.Pod, condition string, -) (bool, lagoonv1beta1.LagoonMessage) { +) error { if r.EnableMQ && lagoonTask != nil { if condition == "failed" || condition == "complete" || condition == "cancelled" { time.AfterFunc(31*time.Second, func() { @@ -212,12 +212,12 @@ func (r *LagoonMonitorReconciler) updateLagoonTask(ctx context.Context, opLog lo // if we can't publish the message, set it as a pending message // overwrite whatever is there as these are just current state messages so it doesn't // really matter if we don't smootly transition in what we send back to lagoon - return true, msg + return err } // if we are able to publish the message, then we need to remove any pending messages from the resource // and make sure we don't try and publish again } - return false, lagoonv1beta1.LagoonMessage{} + return nil } // taskStatusLogsToLagoonLogs sends the logs to lagoon-logs message queue, used for general messaging @@ -225,7 +225,7 @@ func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger, lagoonTask *lagoonv1beta1.LagoonTask, jobPod *corev1.Pod, condition string, -) (bool, lagoonv1beta1.LagoonLog) { +) error { if r.EnableMQ && lagoonTask != nil { msg := lagoonv1beta1.LagoonLog{ Severity: "info", @@ -258,12 +258,12 @@ func (r *LagoonMonitorReconciler) taskStatusLogsToLagoonLogs(opLog logr.Logger, // if we can't publish the message, set it as a pending message // overwrite whatever is there as these are just current state messages so it doesn't // really matter if we don't smootly transition in what we send back to lagoon - return true, msg + return err } // if we are able to publish the message, then we need to remove any pending messages from the resource // and make sure we don't try and publish again } - return false, lagoonv1beta1.LagoonLog{} + return nil } // updateTaskWithLogs collects logs from the task containers and ships or stores them @@ -332,7 +332,6 @@ Task %s "lagoon.sh/taskStatus": taskCondition.String(), }, }, - "statusMessages": map[string]interface{}{}, } condition := lagoonv1beta1.LagoonTaskConditions{ @@ -350,31 +349,18 @@ Task %s // send any messages to lagoon message queues // update the deployment with the status - pendingStatus, pendingStatusMessage := r.taskStatusLogsToLagoonLogs(opLog, &lagoonTask, &jobPod, taskCondition.ToLower()) - pendingEnvironment, pendingEnvironmentMessage := r.updateLagoonTask(ctx, opLog, &lagoonTask, &jobPod, taskCondition.ToLower()) - var pendingTaskLog bool - var pendingTaskLogMessage lagoonv1beta1.LagoonLog + if err = r.taskStatusLogsToLagoonLogs(opLog, &lagoonTask, &jobPod, taskCondition.ToLower()); err != nil { + opLog.Error(err, fmt.Sprintf("Unable to publish task status logs")) + } + if err = r.updateLagoonTask(ctx, opLog, &lagoonTask, &jobPod, taskCondition.ToLower()); err != nil { + opLog.Error(err, fmt.Sprintf("Unable to publish task update")) + } // if the container logs can't be retrieved, we don't want to send any task logs back, as this will nuke // any previously received logs if !strings.Contains(string(allContainerLogs), "unable to retrieve container logs for containerd") { - pendingTaskLog, pendingTaskLogMessage = r.taskLogsToLagoonLogs(opLog, &lagoonTask, &jobPod, taskCondition.ToLower(), allContainerLogs) - } - - if pendingStatus || pendingEnvironment || pendingTaskLog { - mergeMap["metadata"].(map[string]interface{})["labels"].(map[string]interface{})["lagoon.sh/pendingMessages"] = "true" - if pendingStatus { - mergeMap["statusMessages"].(map[string]interface{})["statusMessage"] = pendingStatusMessage - } - if pendingEnvironment { - mergeMap["statusMessages"].(map[string]interface{})["environmentMessage"] = pendingEnvironmentMessage + if err = r.taskLogsToLagoonLogs(opLog, &lagoonTask, &jobPod, taskCondition.ToLower(), allContainerLogs); err != nil { + opLog.Error(err, fmt.Sprintf("Unable to publish task logs")) } - if pendingTaskLog { - mergeMap["statusMessages"].(map[string]interface{})["taskLogMessage"] = pendingTaskLogMessage - } - } - if !pendingStatus && !pendingEnvironment && !pendingTaskLog { - mergeMap["metadata"].(map[string]interface{})["labels"].(map[string]interface{})["lagoon.sh/pendingMessages"] = nil - mergeMap["statusMessages"] = nil } mergePatch, _ := json.Marshal(mergeMap) // check if the task exists diff --git a/internal/messenger/pending_messages.go b/internal/messenger/pending_messages.go deleted file mode 100644 index 26658df6..00000000 --- a/internal/messenger/pending_messages.go +++ /dev/null @@ -1,151 +0,0 @@ -package messenger - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/go-logr/logr" - lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// GetPendingMessages will get any pending messages from the queue and attempt to publish them if possible -func (m *Messenger) GetPendingMessages() { - opLog := ctrl.Log.WithName("handlers").WithName("PendingMessages") - ctx := context.Background() - opLog.Info(fmt.Sprintf("Checking pending build messages across all namespaces")) - m.pendingBuildLogMessages(ctx, opLog) - opLog.Info(fmt.Sprintf("Checking pending task messages across all namespaces")) - m.pendingTaskLogMessages(ctx, opLog) -} - -func (m *Messenger) pendingBuildLogMessages(ctx context.Context, opLog logr.Logger) { - pendingMsgs := &lagoonv1beta1.LagoonBuildList{} - listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{ - client.MatchingLabels(map[string]string{ - "lagoon.sh/pendingMessages": "true", - "lagoon.sh/controller": m.ControllerNamespace, - }), - }) - if err := m.Client.List(ctx, pendingMsgs, listOption); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to list LagoonBuilds, there may be none or something went wrong")) - return - } - for _, build := range pendingMsgs.Items { - // get the latest resource in case it has been updated since the loop started - if err := m.Client.Get(ctx, types.NamespacedName{ - Name: build.ObjectMeta.Name, - Namespace: build.ObjectMeta.Namespace, - }, &build); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to get LagoonBuild, something went wrong")) - break - } - opLog.Info(fmt.Sprintf("LagoonBuild %s has pending messages, attempting to re-send", build.ObjectMeta.Name)) - - // try to re-publish message or break and try the next build with pending message - if build.StatusMessages.StatusMessage != nil { - statusBytes, _ := json.Marshal(build.StatusMessages.StatusMessage) - if err := m.Publish("lagoon-logs", statusBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if build.StatusMessages.BuildLogMessage != nil { - logBytes, _ := json.Marshal(build.StatusMessages.BuildLogMessage) - if err := m.Publish("lagoon-logs", logBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if build.StatusMessages.EnvironmentMessage != nil { - envBytes, _ := json.Marshal(build.StatusMessages.EnvironmentMessage) - if err := m.Publish("lagoon-tasks:controller", envBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - // if we managed to send all the pending messages, then update the resource to remove the pending state - // so we don't send the same message multiple times - opLog.Info(fmt.Sprintf("Sent pending messages for LagoonBuild %s", build.ObjectMeta.Name)) - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "false", - }, - }, - "statusMessages": nil, - }) - if err := m.Client.Patch(ctx, &build, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to update status condition")) - break - } - } - return -} - -func (m *Messenger) pendingTaskLogMessages(ctx context.Context, opLog logr.Logger) { - pendingMsgs := &lagoonv1beta1.LagoonTaskList{} - listOption := (&client.ListOptions{}).ApplyOptions([]client.ListOption{ - client.MatchingLabels(map[string]string{ - "lagoon.sh/pendingMessages": "true", - "lagoon.sh/controller": m.ControllerNamespace, - }), - }) - if err := m.Client.List(ctx, pendingMsgs, listOption); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to list LagoonBuilds, there may be none or something went wrong")) - return - } - for _, task := range pendingMsgs.Items { - // get the latest resource in case it has been updated since the loop started - if err := m.Client.Get(ctx, types.NamespacedName{ - Name: task.ObjectMeta.Name, - Namespace: task.ObjectMeta.Namespace, - }, &task); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to get LagoonBuild, something went wrong")) - break - } - opLog.Info(fmt.Sprintf("LagoonTasl %s has pending messages, attempting to re-send", task.ObjectMeta.Name)) - - // try to re-publish message or break and try the next build with pending message - if task.StatusMessages.StatusMessage != nil { - statusBytes, _ := json.Marshal(task.StatusMessages.StatusMessage) - if err := m.Publish("lagoon-logs", statusBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if task.StatusMessages.TaskLogMessage != nil { - taskLogBytes, _ := json.Marshal(task.StatusMessages.TaskLogMessage) - if err := m.Publish("lagoon-logs", taskLogBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - if task.StatusMessages.EnvironmentMessage != nil { - envBytes, _ := json.Marshal(task.StatusMessages.EnvironmentMessage) - if err := m.Publish("lagoon-tasks:controller", envBytes); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to publush message")) - break - } - } - // if we managed to send all the pending messages, then update the resource to remove the pending state - // so we don't send the same message multiple times - opLog.Info(fmt.Sprintf("Sent pending messages for LagoonTask %s", task.ObjectMeta.Name)) - mergePatch, _ := json.Marshal(map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "lagoon.sh/pendingMessages": "false", - }, - }, - "statusMessages": nil, - }) - if err := m.Client.Patch(ctx, &task, client.RawPatch(types.MergePatchType, mergePatch)); err != nil { - opLog.Error(err, fmt.Sprintf("Unable to update status condition")) - break - } - } - return -} diff --git a/main.go b/main.go index 7238b1ab..c9034592 100644 --- a/main.go +++ b/main.go @@ -85,7 +85,6 @@ func main() { var enableLeaderElection bool var enableMQ bool var leaderElectionID string - var pendingMessageCron string var mqWorkers int var rabbitRetryInterval int var startupConnectionAttempts int @@ -191,8 +190,7 @@ func main() { "The retry interval for rabbitmq.") flag.StringVar(&leaderElectionID, "leader-election-id", "lagoon-builddeploy-leader-election-helper", "The ID to use for leader election.") - flag.StringVar(&pendingMessageCron, "pending-message-cron", "15,45 * * * *", - "The cron definition for pending messages.") + flag.String("pending-message-cron", "", "This does nothing and will be removed in a future version.") flag.IntVar(&startupConnectionAttempts, "startup-connection-attempts", 10, "The number of startup attempts before exiting.") flag.IntVar(&startupConnectionInterval, "startup-connection-interval-seconds", 30, @@ -375,7 +373,6 @@ func main() { mqPass = helpers.GetEnv("RABBITMQ_PASSWORD", mqPass) mqHost = helpers.GetEnv("RABBITMQ_HOSTNAME", mqHost) lagoonTargetName = helpers.GetEnv("LAGOON_TARGET_NAME", lagoonTargetName) - pendingMessageCron = helpers.GetEnv("PENDING_MESSAGE_CRON", pendingMessageCron) overrideBuildDeployImage = helpers.GetEnv("OVERRIDE_BUILD_DEPLOY_DIND_IMAGE", overrideBuildDeployImage) namespacePrefix = helpers.GetEnv("NAMESPACE_PREFIX", namespacePrefix) if len(namespacePrefix) > 8 { @@ -657,13 +654,6 @@ func main() { if enableMQ { setupLog.Info("starting messaging handler") go messaging.Consumer(lagoonTargetName) - - // use cron to run a pending message task - // this will check any `LagoonBuild` resources for the pendingMessages label - // and attempt to re-publish them - c.AddFunc(pendingMessageCron, func() { - messaging.GetPendingMessages() - }) } buildQoSConfig := lagoonv1beta1ctrl.BuildQoS{