Skip to content

Commit

Permalink
fix(rolling-update): avoid replica takeover for single pod instance (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhra303 authored Jun 18, 2024
1 parent 9be1c01 commit 64cfcba
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 66 deletions.
30 changes: 18 additions & 12 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
fullSyncedUpdatedReplicas := 0
for _, replica := range replicas {
// Check only with latest replicas
onLatestVersion, err := isPodOnLatestVersion(ctx, r.Client, &replica, &updatedStatefulset)
onLatestVersion, err := isPodOnLatestVersion(&replica, &updatedStatefulset)
if err != nil {
log.Error(err, "could not check if pod is on latest version")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
if onLatestVersion {
// check if the replica had a full sync
log.Info("New Replica found. Checking if replica had a full sync", "pod", replica.Name)
isStableState, err := isStableState(ctx, r.Client, &replica)
isStableState, err := isStableState(ctx, &replica)
if err != nil {
log.Error(err, "could not check if pod is in stable state")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
Expand All @@ -167,7 +167,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// delete older version replicas
for _, replica := range replicas {
// Check if pod is on latest version
onLatestVersion, err := isPodOnLatestVersion(ctx, r.Client, &replica, &updatedStatefulset)
onLatestVersion, err := isPodOnLatestVersion(&replica, &updatedStatefulset)
if err != nil {
log.Error(err, "could not check if pod is on latest version")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
Expand All @@ -186,13 +186,17 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

latestReplica, err := getLatestReplica(ctx, r.Client, &updatedStatefulset)
if err != nil {
log.Error(err, "could not get latest replica")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
var latestReplica *corev1.Pod
var err error
if len(replicas) > 0 {
latestReplica, err = getLatestReplica(ctx, r.Client, &updatedStatefulset)
if err != nil {
log.Error(err, "could not get latest replica")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
}

masterOnLatest, err := isPodOnLatestVersion(ctx, r.Client, &master, &updatedStatefulset)
masterOnLatest, err := isPodOnLatestVersion(&master, &updatedStatefulset)
if err != nil {
log.Error(err, "could not check if pod is on latest version")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
Expand All @@ -202,10 +206,12 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// are on latest version
if !masterOnLatest {
// Update master now
log.Info("Running REPLTAKEOVER on replica", "pod", master.Name)
if err := replTakeover(ctx, r.Client, latestReplica); err != nil {
log.Error(err, "could not update master")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
if latestReplica != nil {
log.Info("Running REPLTAKEOVER on replica", "pod", master.Name)
if err := replTakeover(ctx, r.Client, latestReplica); err != nil {
log.Error(err, "could not update master")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", fmt.Sprintf("Shutting down master %s", master.Name))

Expand Down
14 changes: 1 addition & 13 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"

dfv1alpha1 "github.com/dragonflydb/dragonfly-operator/api/v1alpha1"
resourcesv1 "github.com/dragonflydb/dragonfly-operator/api/v1alpha1"
"github.com/dragonflydb/dragonfly-operator/internal/resources"
"github.com/go-logr/logr"
"github.com/redis/go-redis/v9"
Expand All @@ -36,7 +35,7 @@ import (
// and provides methods to handle replication.
type DragonflyInstance struct {
// Dragonfly is the relevant Dragonfly CRD that it performs actions over
df *resourcesv1.Dragonfly
df *dfv1alpha1.Dragonfly

client client.Client
log logr.Logger
Expand Down Expand Up @@ -65,17 +64,6 @@ func GetDragonflyInstanceFromPod(ctx context.Context, c client.Client, pod *core
}, nil
}

func (dfi *DragonflyInstance) getStatus(ctx context.Context) (string, error) {
if err := dfi.client.Get(ctx, types.NamespacedName{
Name: dfi.df.Name,
Namespace: dfi.df.Namespace,
}, dfi.df); err != nil {
return "", err
}

return dfi.df.Status.Phase, nil
}

func (dfi *DragonflyInstance) configureReplication(ctx context.Context) error {
dfi.log.Info("Configuring replication")

Expand Down
44 changes: 3 additions & 41 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/dragonflydb/dragonfly-operator/internal/resources"
"github.com/redis/go-redis/v9"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -40,7 +38,7 @@ const (

// isPodOnLatestVersion returns if the Given pod is on the updatedRevision
// of the given statefulset or not
func isPodOnLatestVersion(ctx context.Context, c client.Client, pod *corev1.Pod, statefulSet *appsv1.StatefulSet) (bool, error) {
func isPodOnLatestVersion(pod *corev1.Pod, statefulSet *appsv1.StatefulSet) (bool, error) {
// Get the pod's revision
podRevision, ok := pod.Labels[appsv1.StatefulSetRevisionLabel]
if !ok {
Expand Down Expand Up @@ -74,7 +72,7 @@ func getLatestReplica(ctx context.Context, c client.Client, statefulSet *appsv1.
// Iterate over the pods and find a replica which is on the latest version
for _, pod := range podList.Items {

isLatest, err := isPodOnLatestVersion(ctx, c, &pod, statefulSet)
isLatest, err := isPodOnLatestVersion(&pod, statefulSet)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,43 +110,7 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e
return nil
}

func waitForStatefulSetReady(ctx context.Context, c client.Client, name, namespace string, maxDuration time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, maxDuration)
defer cancel()
for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out waiting for statefulset to be ready")
default:
// Check if the statefulset is ready
ready, err := isStatefulSetReady(ctx, c, name, namespace)
if err != nil {
return err
}
if ready {
return nil
}
}
}
}

func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace string) (bool, error) {
var statefulSet appsv1.StatefulSet
if err := c.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &statefulSet); err != nil {
return false, nil
}

if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas {
return true, nil
}

return false, nil
}

func isStableState(ctx context.Context, c client.Client, pod *corev1.Pod) (bool, error) {
func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
// wait until pod IP is ready
if pod.Status.PodIP == "" || pod.Status.Phase != corev1.PodRunning {
return false, nil
Expand Down

0 comments on commit 64cfcba

Please sign in to comment.