Skip to content

Commit

Permalink
todo
Browse files Browse the repository at this point in the history
Signed-off-by: Raghavendra Talur <[email protected]>
  • Loading branch information
raghavendra-talur committed Dec 9, 2024
1 parent 2753759 commit 1314716
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 57 deletions.
2 changes: 1 addition & 1 deletion internal/controller/ramenconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
VolumeUnprotectionEnabledForAsyncVolSync = false
)

// FIXME
// FIXME #1710
const NoS3StoreAvailable = "NoS3"

var ControllerType ramendrv1alpha1.ControllerType
Expand Down
18 changes: 15 additions & 3 deletions internal/controller/volumereplicationgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,22 @@ func (v *VRGInstance) clusterDataRestore(result *ctrl.Result) (int, error) {
return numRestoredForVS + numRestoredForVR, fmt.Errorf("failed to restore PV/PVC for VolRep (%w)", err)
}

// Only after both succeed, we mark ClusterDataReady as true
msg := "Restored PVs and PVCs"
if numRestoredForVS+numRestoredForVR == 0 {
objectsRestored, err := v.kubeObjectsRecover(result)
if err != nil {
v.log.Info("Kube objects restore failed")

return numRestoredForVS + numRestoredForVR, fmt.Errorf("failed to restore kube objects (%w)", err)
}

var msg string
// Only after volsync, volrep and kubeObjects succeed, we mark ClusterDataReady as true
if numRestoredForVS+numRestoredForVR == 0 && !objectsRestored {
msg = "Nothing to restore"
} else {
msg = fmt.Sprintf("Restored %d volsync PVs/PVCs and %d volrep PVs/PVCs", numRestoredForVS, numRestoredForVR)
if objectsRestored {
msg = msg + " and kube objects"

Check failure on line 656 in internal/controller/volumereplicationgroup_controller.go

View workflow job for this annotation

GitHub Actions / Golangci Lint (.)

assignOp: replace `msg = msg + " and kube objects"` with `msg += " and kube objects"` (gocritic)
}
}

setVRGClusterDataReadyCondition(&v.instance.Status.Conditions, v.instance.Generation, msg)
Expand Down
174 changes: 125 additions & 49 deletions internal/controller/vrg_kubeobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,65 +495,96 @@ func (v *VRGInstance) kubeObjectsCaptureStatus(status metav1.ConditionStatus, re
}
}

func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result,
s3StoreProfile ramen.S3StoreProfile, objectStorer ObjectStorer,
) error {
func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result) (bool, error) {
if v.kubeObjectProtectionDisabled("recovery") {
return nil
}
v.log.Info("KubeObjects recovery disabled")

localS3StoreAccessor, err := v.findS3StoreAccessor(s3StoreProfile)
if err != nil {
return err
return false, nil
}

vrg := v.instance
sourceVrgNamespaceName, sourceVrgName := vrg.Namespace, vrg.Name
sourcePathNamePrefix := s3PathNamePrefix(sourceVrgNamespaceName, sourceVrgName)
v.log.Info("Restoring KubeObjects")

sourceVrg := &ramen.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStorer, sourcePathNamePrefix, sourceVrg); err != nil {
v.log.Error(err, "Kube objects capture-to-recover-from identifier get error")
if len(v.instance.Spec.S3Profiles) == 0 {
v.log.Info("No S3 profiles configured")

return nil
}

captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom
if captureToRecoverFromIdentifier == nil {
v.log.Info("Kube objects capture-to-recover-from identifier nil")
result.Requeue = true

return nil
return false, fmt.Errorf("no S3Profiles configured")
}

vrg.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier
veleroNamespaceName := v.veleroNamespaceName()
labels := util.OwnerLabels(vrg)
log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number, "profile", localS3StoreAccessor.S3ProfileName)
v.log.Info(fmt.Sprintf("Restoring KubeObjects to this managed cluster. ProfileList: %v", v.instance.Spec.S3Profiles))

captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet(
v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels)
restored, err := v.kubeObjectsRecoverFromS3(result)
if err != nil {
log.Error(err, "Kube objects capture requests query error")
errMsg := fmt.Sprintf("failed to KubeObjects using profile list (%v)", v.instance.Spec.S3Profiles)
v.log.Info(errMsg)

return err
return restored, fmt.Errorf("%s: %w", errMsg, err)
}

recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet(
v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels)
if err != nil {
log.Error(err, "Kube objects recover requests query error")
return restored, nil
}

return err
func (v *VRGInstance) kubeObjectsRecoverFromS3(result *ctrl.Result) (bool, error) {
objectsRestored := true

err := errors.New("s3Profiles empty")
NoS3 := false

for _, s3ProfileName := range v.instance.Spec.S3Profiles {
if s3ProfileName == NoS3StoreAvailable {
v.log.Info("NoS3 available to fetch")

NoS3 = true

continue
}

objectStorer, _, err := v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
v.log.Error(err, "Kube objects recovery object store inaccessible", "profile", s3ProfileName)

continue
}

sourceVrg := &ramen.VolumeReplicationGroup{}
if err := vrgObjectDownload(objectStorer, s3PathNamePrefix(v.instance.Namespace, v.instance.Name),
sourceVrg); err != nil {
v.log.Error(err, "Kube objects capture-to-recover-from identifier get error")

// TODO: check if not finding a vrg is an error
return !objectsRestored, nil
}

captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom
if captureToRecoverFromIdentifier == nil {
v.log.Info("Kube objects capture-to-recover-from identifier nil")

// TODO: check if vrg doesn't have a capture-to-recover-from is an error
return !objectsRestored, nil
}

v.instance.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier
log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number,
"profile", s3ProfileName)

err = v.kubeObjectsRecoveryStartOrResume(result, s3ProfileName, captureToRecoverFromIdentifier, log)
if err != nil {
return !objectsRestored, err
}

return objectsRestored, nil
}

return v.kubeObjectsRecoveryStartOrResume(
result,
s3StoreAccessor{objectStorer, localS3StoreAccessor.S3StoreProfile},
sourceVrgNamespaceName, sourceVrgName, captureToRecoverFromIdentifier,
kubeobjects.RequestsMapKeyedByName(captureRequestsStruct),
kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct),
veleroNamespaceName, labels, log,
)
if NoS3 {
// TODO: check if objectsRestored should be false. Affects tests only.
return objectsRestored, nil
}

result.Requeue = true

return !objectsRestored, err
}

func (v *VRGInstance) findS3StoreAccessor(s3StoreProfile ramen.S3StoreProfile) (s3StoreAccessor, error) {
Expand Down Expand Up @@ -627,16 +658,60 @@ func (v *VRGInstance) getRecoverOrProtectRequest(
}
}

func (v *VRGInstance) getCaptureRequests() (map[string]kubeobjects.Request, error) {
captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet(
v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance))
if err != nil {
return nil, fmt.Errorf("kube objects capture requests query error: %v", err)
}

return kubeobjects.RequestsMapKeyedByName(captureRequestsStruct), nil
}

func (v *VRGInstance) getRecoverRequests() (map[string]kubeobjects.Request, error) {
recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet(
v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance))
if err != nil {
return nil, fmt.Errorf("kube objects recover requests query error: %v", err)
}

return kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct), nil
}

func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName string,
result *ctrl.Result, s3ProfileName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
captureRequests, recoverRequests map[string]kubeobjects.Request,
veleroNamespaceName string, labels map[string]string, log logr.Logger,
log logr.Logger,
) error {
veleroNamespaceName := v.veleroNamespaceName()
labels := util.OwnerLabels(v.instance)

captureRequests, err := v.getCaptureRequests()
if err != nil {
return err
}

recoverRequests, err := v.getRecoverRequests()
if err != nil {
return err
}

groups := v.recipeElements.RecoverWorkflow
requests := make([]kubeobjects.Request, len(groups))

objectStorer, s3StoreProfile, err := v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
return fmt.Errorf("kube objects recovery object store inaccessible for profile %v: %v", s3ProfileName, err)
}

localS3StoreAccessor, err := v.findS3StoreAccessor(s3StoreProfile)
if err != nil {
return fmt.Errorf("kube objects recovery couldn't build s3StoreAccessor: %v", err)
}

s3StoreAccessor := s3StoreAccessor{objectStorer, localS3StoreAccessor.S3StoreProfile}

for groupNumber, recoverGroup := range groups {
rg := recoverGroup
log1 := log.WithValues("group", groupNumber, "name", rg.BackupName)
Expand All @@ -646,8 +721,8 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
return fmt.Errorf("check hook execution failed during restore %s: %v", rg.Hook.Name, err)
}
} else {
if err := v.executeRecoverGroup(result, s3StoreAccessor, sourceVrgNamespaceName,
sourceVrgName, captureToRecoverFromIdentifier, captureRequests,
if err := v.executeRecoverGroup(result, s3StoreAccessor,
captureToRecoverFromIdentifier, captureRequests,
recoverRequests, veleroNamespaceName, labels, groupNumber, rg,
requests, log1); err != nil {
return err
Expand All @@ -663,12 +738,13 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume(
}

func (v *VRGInstance) executeRecoverGroup(result *ctrl.Result, s3StoreAccessor s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName string,
captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier,
captureRequests, recoverRequests map[string]kubeobjects.Request,
veleroNamespaceName string, labels map[string]string, groupNumber int,
rg kubeobjects.RecoverSpec, requests []kubeobjects.Request, log1 logr.Logger,
) error {
sourceVrgName := v.instance.Name
sourceVrgNamespaceName := v.instance.Namespace
request, ok, submit, cleanup := v.getRecoverOrProtectRequest(
captureRequests, recoverRequests, s3StoreAccessor,
sourceVrgNamespaceName, sourceVrgName,
Expand Down
6 changes: 2 additions & 4 deletions internal/controller/vrg_volrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2010,9 +2010,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

var objectStore ObjectStorer

var s3StoreProfile ramendrv1alpha1.S3StoreProfile

objectStore, s3StoreProfile, err = v.reconciler.ObjStoreGetter.ObjectStore(
objectStore, _, err = v.reconciler.ObjStoreGetter.ObjectStore(
v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log)
if err != nil {
v.log.Error(err, "Kube objects recovery object store inaccessible", "profile", s3ProfileName)
Expand Down Expand Up @@ -2046,7 +2044,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error)

v.log.Info(fmt.Sprintf("Restored %d PVs and %d PVCs using profile %s", pvCount, pvcCount, s3ProfileName))

return pvCount + pvcCount, v.kubeObjectsRecover(result, s3StoreProfile, objectStore)
return pvCount + pvcCount, nil
}

if NoS3 {
Expand Down

0 comments on commit 1314716

Please sign in to comment.