Skip to content

Commit

Permalink
feat: support idling messages from core
Browse files Browse the repository at this point in the history
  • Loading branch information
shreddedbacon committed Nov 26, 2024
1 parent 27b41c1 commit 07f40b7
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 5 deletions.
2 changes: 0 additions & 2 deletions apis/lagoon/v1beta2/lagoontask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ func (b TaskType) String() string {

// LagoonTaskSpec defines the desired state of LagoonTask
type LagoonTaskSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Key string `json:"key,omitempty"`
Task schema.LagoonTaskInfo `json:"task,omitempty"`
Project LagoonTaskProject `json:"project,omitempty"`
Expand Down
3 changes: 0 additions & 3 deletions config/crd/bases/crd.lagoon.sh_lagoontasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,6 @@ spec:
- project
type: object
key:
description: |-
INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
Important: Run "make" to regenerate code after modifying this file
type: string
misc:
description: LagoonMiscInfo defines the resource or backup information
Expand Down
113 changes: 113 additions & 0 deletions controllers/namespace/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package namespace

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strconv"

"github.com/go-logr/logr"
"github.com/uselagoon/machinery/api/schema"
"github.com/uselagoon/remote-controller/internal/messenger"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

// NamespaceReconciler reconciles idling
type NamespaceReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
EnableMQ bool
Messaging *messenger.Messenger
LagoonTargetName string
}

type Idled struct {
Idled bool `json:"idled"`
}

func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
opLog := r.Log.WithValues("namespace", req.NamespacedName)

var namespace corev1.Namespace
if err := r.Get(ctx, req.NamespacedName, &namespace); err != nil {
return ctrl.Result{}, ignoreNotFound(err)
}

// this would be nice to be a lagoon label :)
if val, ok := namespace.ObjectMeta.Labels["idling.amazee.io/idled"]; ok {
idled, _ := strconv.ParseBool(val)
opLog.Info(fmt.Sprintf("environment %s idle state %t", namespace.Name, idled))
if r.EnableMQ {
var projectName, environmentName string
if p, ok := namespace.ObjectMeta.Labels["lagoon.sh/project"]; ok {
projectName = p
}
if e, ok := namespace.ObjectMeta.Labels["lagoon.sh/environment"]; ok {
environmentName = e
}
idling := Idled{
Idled: idled,
}
idlingJSON, _ := json.Marshal(idling)
msg := schema.LagoonMessage{
Type: "idling",
Namespace: namespace.Name,
Meta: &schema.LagoonLogMeta{
Environment: environmentName,
Project: projectName,
Cluster: r.LagoonTargetName,
AdvancedData: base64.StdEncoding.EncodeToString(idlingJSON),
},
}
msgBytes, err := json.Marshal(msg)
if err != nil {
opLog.Error(err, "Unable to encode message as JSON")
}
// @TODO: if we can't publish the message because for some reason, log the error and move on
// this may result in the state being out of sync in lagoon but eventually will be consistent
if err := r.Messaging.Publish("lagoon-tasks:controller", msgBytes); err != nil {
return ctrl.Result{}, nil
}
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the watch on the namespace resource with an event filter (see predicates.go)
func (r *NamespaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Namespace{}).
WithEventFilter(NamespacePredicates{}).
Complete(r)
}

// will ignore not found errors
func ignoreNotFound(err error) error {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
38 changes: 38 additions & 0 deletions controllers/namespace/predicates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package namespace

import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// NamespacePredicates defines the funcs for predicates
type NamespacePredicates struct {
predicate.Funcs
}

// Create is used when a creation event is received by the controller.
func (n NamespacePredicates) Create(e event.CreateEvent) bool {
return false
}

// Delete is used when a deletion event is received by the controller.
func (n NamespacePredicates) Delete(e event.DeleteEvent) bool {
return false
}

// Update is used when an update event is received by the controller.
func (n NamespacePredicates) Update(e event.UpdateEvent) bool {
if oldIdled, ok := e.ObjectOld.GetLabels()["idling.amazee.io/idled"]; ok {
if newIdled, ok := e.ObjectNew.GetLabels()["idling.amazee.io/idled"]; ok {
if oldIdled != newIdled {
return true
}
}
}
return false
}

// Generic is used when any other event is received by the controller.
func (n NamespacePredicates) Generic(e event.GenericEvent) bool {
return false
}
32 changes: 32 additions & 0 deletions internal/messenger/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,38 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
return
}
case "deploytarget:environment:idling":
opLog.Info(
fmt.Sprintf(
"Received environment idling request for project %s, environment %s - %s",
jobSpec.Project.Name,
jobSpec.Environment.Name,
namespace,
),
)
// idle or unidle an environment, optionally forcible scale it so it can't be unidled by the ingress
err := m.ScaleOrIdleEnvironment(ctx, opLog, namespace, jobSpec)
if err != nil {
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
case "deploytarget:environment:service":
opLog.Info(
fmt.Sprintf(
"Received environment service request for project %s, environment %s service - %s",
jobSpec.Project.Name,
jobSpec.Environment.Name,
namespace,
),
)
// idle an environment, optionally forcible scale it so it can't be unidled by the ingress
err := m.EnvironmentServiceState(ctx, opLog, namespace, jobSpec)
if err != nil {
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
default:
// if we get something that we don't know about, spit out the entire message
opLog.Info(
Expand Down
114 changes: 114 additions & 0 deletions internal/messenger/tasks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/go-logr/logr"
lagoonv1beta2 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta2"
"github.com/uselagoon/remote-controller/internal/helpers"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand Down Expand Up @@ -92,3 +98,111 @@ func createAdvancedTask(namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec,
}
return nil
}

type Idling struct {
Idle bool `json:"idle"`
ForceScale bool `json:"forceScale"`
}

type Service struct {
Name string `json:"name"`
State string `json:"state"`
}

func (m *Messenger) ScaleOrIdleEnvironment(ctx context.Context, opLog logr.Logger, ns string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
namespace := &corev1.Namespace{}
err := m.Client.Get(ctx, types.NamespacedName{
Name: ns,
}, namespace)
if err != nil {
return err
}
idling := Idling{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &idling); err != nil {
opLog.Error(err,
"Unable to unmarshal the idling json.",
)
return err
}
if idling.Idle {
if idling.ForceScale {
// this would be nice to be a lagoon label :)
namespace.ObjectMeta.Labels["idling.amazee.io/force-scaled"] = "true"
} else {
// this would be nice to be a lagoon label :)
namespace.ObjectMeta.Labels["idling.amazee.io/force-idled"] = "true"
}
} else {
// this would be nice to be a lagoon label :)
namespace.ObjectMeta.Labels["idling.amazee.io/unidle"] = "true"
}
if err := m.Client.Update(context.Background(), namespace); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to update namespace %s to set idle state.",
ns,
),
)
return err
}
return nil
}

func (m *Messenger) EnvironmentServiceState(ctx context.Context, opLog logr.Logger, ns string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
deployment := &appsv1.Deployment{}
service := Service{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &service); err != nil {
opLog.Error(err,
"Unable to unmarshal the service json.",
)
return err
}
err := m.Client.Get(ctx, types.NamespacedName{
Name: service.Name,
Namespace: ns,
}, deployment)
if err != nil {
return err
}
update := false
switch service.State {
case "restart":
deployment.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)
update = true
case "stop":
if *deployment.Spec.Replicas > 0 {
// if the service has replicas, then save the replica count and scale it to 0
deployment.ObjectMeta.Annotations["service.lagoon.sh/replicas"] = strconv.FormatInt(int64(*deployment.Spec.Replicas), 10)
replicas := int32(0)
deployment.Spec.Replicas = &replicas
update = true
}
case "start":
if *deployment.Spec.Replicas == 0 {
// if the service has no replicas, set it back to what the previous replica value was
prevReplicas, err := strconv.Atoi(deployment.ObjectMeta.Annotations["service.lagoon.sh/replicas"])
if err != nil {
return err
}
replicas := int32(prevReplicas)
deployment.Spec.Replicas = &replicas
delete(deployment.ObjectMeta.Annotations, "service.lagoon.sh/replicas")
update = true
}
default:
// nothing to do
return nil
}
if update {
if err := m.Client.Update(ctx, deployment); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to update deployment %s to change its state.",
ns,
),
)
return err
}
}
return nil
}
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
lagoonv1beta1 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta1"
lagoonv1beta2 "github.com/uselagoon/remote-controller/apis/lagoon/v1beta2"
harborctrl "github.com/uselagoon/remote-controller/controllers/harbor"
"github.com/uselagoon/remote-controller/controllers/namespace"
lagoonv1beta1ctrl "github.com/uselagoon/remote-controller/controllers/v1beta1"
lagoonv1beta2ctrl "github.com/uselagoon/remote-controller/controllers/v1beta2"
"github.com/uselagoon/remote-controller/internal/messenger"
Expand Down Expand Up @@ -885,6 +886,18 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "LagoonTask")
os.Exit(1)
}
// start the namespace reconciler
if err = (&namespace.NamespaceReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("namespace").WithName("Namespace"),
Scheme: mgr.GetScheme(),
EnableMQ: enableMQ,
Messaging: messaging,
LagoonTargetName: lagoonTargetName,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Namespace")
os.Exit(1)
}

// v1beta2 is the latest version
if err = (&lagoonv1beta2ctrl.LagoonBuildReconciler{
Expand Down

0 comments on commit 07f40b7

Please sign in to comment.