diff --git a/e2e/dractions/actions.go b/e2e/dractions/actions.go index 6e6168fb5..62cc67c72 100644 --- a/e2e/dractions/actions.go +++ b/e2e/dractions/actions.go @@ -64,6 +64,7 @@ func EnableProtection(w workloads.Workload, d deployers.Deployer) error { if placement.Annotations == nil { placement.Annotations = make(map[string]string) } + placement.Annotations[OcmSchedulingDisable] = "true" return updatePlacement(util.Ctx.Hub.CtrlClient, placement) diff --git a/internal/controller/cel/cel_drclusters_test.go b/internal/controller/cel/cel_drclusters_test.go index 10023fa41..da3bacd6f 100644 --- a/internal/controller/cel/cel_drclusters_test.go +++ b/internal/controller/cel/cel_drclusters_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cel_test import ( diff --git a/internal/controller/cel/cel_drplacementcontrol_test.go b/internal/controller/cel/cel_drplacementcontrol_test.go index 0c1756ded..3465f0ecc 100644 --- a/internal/controller/cel/cel_drplacementcontrol_test.go +++ b/internal/controller/cel/cel_drplacementcontrol_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cel_test import ( diff --git a/internal/controller/cel/cel_drpolicy_test.go b/internal/controller/cel/cel_drpolicy_test.go index 3c50818a3..c89cbad22 100644 --- a/internal/controller/cel/cel_drpolicy_test.go +++ b/internal/controller/cel/cel_drpolicy_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cel_test import ( diff --git a/internal/controller/cephfscg/cephfscg_suite_test.go b/internal/controller/cephfscg/cephfscg_suite_test.go index ad68b2316..b3b36bf9b 100644 --- a/internal/controller/cephfscg/cephfscg_suite_test.go +++ b/internal/controller/cephfscg/cephfscg_suite_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg_test import ( diff --git a/internal/controller/cephfscg/cghandler.go b/internal/controller/cephfscg/cghandler.go index e778bd098..eab0f8bbc 100644 --- a/internal/controller/cephfscg/cghandler.go +++ b/internal/controller/cephfscg/cghandler.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg import ( diff --git a/internal/controller/cephfscg/cghandler_test.go b/internal/controller/cephfscg/cghandler_test.go index cfa2228fb..af2472d32 100644 --- a/internal/controller/cephfscg/cghandler_test.go +++ b/internal/controller/cephfscg/cghandler_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg_test import ( diff --git a/internal/controller/cephfscg/replicationgroupdestination.go b/internal/controller/cephfscg/replicationgroupdestination.go index 2948261ca..fd76b7abd 100644 --- a/internal/controller/cephfscg/replicationgroupdestination.go +++ b/internal/controller/cephfscg/replicationgroupdestination.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg import ( diff --git a/internal/controller/cephfscg/replicationgroupdestination_test.go b/internal/controller/cephfscg/replicationgroupdestination_test.go index 8b8d67d93..a9e5fefc6 100644 --- a/internal/controller/cephfscg/replicationgroupdestination_test.go +++ b/internal/controller/cephfscg/replicationgroupdestination_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg_test import ( diff --git a/internal/controller/cephfscg/replicationgroupsource.go b/internal/controller/cephfscg/replicationgroupsource.go index 3cbeae7f2..477d3fbc3 100644 --- a/internal/controller/cephfscg/replicationgroupsource.go +++ b/internal/controller/cephfscg/replicationgroupsource.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg import ( diff --git a/internal/controller/cephfscg/replicationgroupsource_test.go b/internal/controller/cephfscg/replicationgroupsource_test.go index 5bf5fcae8..650e08f1a 100644 --- a/internal/controller/cephfscg/replicationgroupsource_test.go +++ b/internal/controller/cephfscg/replicationgroupsource_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg_test import ( diff --git a/internal/controller/cephfscg/utils.go b/internal/controller/cephfscg/utils.go index fb1c08ea0..9790ab32f 100644 --- a/internal/controller/cephfscg/utils.go +++ b/internal/controller/cephfscg/utils.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg import ( diff --git a/internal/controller/cephfscg/volumegroupsourcehandler.go b/internal/controller/cephfscg/volumegroupsourcehandler.go index 940b92e2c..07ea318d8 100644 --- a/internal/controller/cephfscg/volumegroupsourcehandler.go +++ b/internal/controller/cephfscg/volumegroupsourcehandler.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg import ( diff --git a/internal/controller/cephfscg/volumegroupsourcehandler_test.go b/internal/controller/cephfscg/volumegroupsourcehandler_test.go index a3caefb7b..b68fdf825 100644 --- a/internal/controller/cephfscg/volumegroupsourcehandler_test.go +++ b/internal/controller/cephfscg/volumegroupsourcehandler_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package cephfscg_test import ( @@ -211,6 +214,7 @@ func CreateRS(rsName string) { func UpdateRS(rsName string) { retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error { rs := &volsyncv1alpha1.ReplicationSource{} + err := k8sClient.Get(context.TODO(), types.NamespacedName{ Name: rsName, Namespace: "default", @@ -218,6 +222,7 @@ func UpdateRS(rsName string) { if err != nil { return err } + rs.Status = &volsyncv1alpha1.ReplicationSourceStatus{ LastManualSync: manualString, } @@ -249,6 +254,7 @@ func GenerateReplicationGroupSource( func UpdateVGS(rgs *v1alpha1.ReplicationGroupSource, vsName, pvcName string) { retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error { volumeGroupSnapshot := &vgsv1alphfa1.VolumeGroupSnapshot{} + err := k8sClient.Get(context.TODO(), types.NamespacedName{ Name: fmt.Sprintf(cephfscg.VolumeGroupSnapshotNameFormat, rgs.Name), Namespace: rgs.Namespace, @@ -256,6 +262,7 @@ func UpdateVGS(rgs *v1alpha1.ReplicationGroupSource, vsName, pvcName string) { if err != nil { return err } + ready := true volumeGroupSnapshot.Status = &vgsv1alphfa1.VolumeGroupSnapshotStatus{ ReadyToUse: &ready, diff --git a/internal/controller/drplacementcontrol_controller.go b/internal/controller/drplacementcontrol_controller.go index 37efe7c88..cc69c5ddc 100644 --- a/internal/controller/drplacementcontrol_controller.go +++ b/internal/controller/drplacementcontrol_controller.go @@ -1128,7 +1128,6 @@ func getVRGsFromManagedClusters( vrgs := map[string]*rmn.VolumeReplicationGroup{} annotations := make(map[string]string) - annotations[DRPCNameAnnotation] = drpc.Name annotations[DRPCNamespaceAnnotation] = drpc.Namespace @@ -1144,6 +1143,7 @@ func getVRGsFromManagedClusters( // Only NotFound error is accepted if errors.IsNotFound(err) { log.Info(fmt.Sprintf("VRG not found on %q", drCluster.Name)) + numClustersQueriedSuccessfully++ continue diff --git a/internal/controller/util/cephfs_cg.go b/internal/controller/util/cephfs_cg.go index 2cceaced4..bb830e664 100644 --- a/internal/controller/util/cephfs_cg.go +++ b/internal/controller/util/cephfs_cg.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package util import ( diff --git a/internal/controller/util/cephfs_cg_test.go b/internal/controller/util/cephfs_cg_test.go index dd83e962c..c175b2462 100644 --- a/internal/controller/util/cephfs_cg_test.go +++ b/internal/controller/util/cephfs_cg_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package util_test import ( diff --git a/internal/controller/util/json_util.go b/internal/controller/util/json_util.go index 1d3238222..88d9ad02f 100644 --- a/internal/controller/util/json_util.go +++ b/internal/controller/util/json_util.go @@ -1,9 +1,13 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package util import ( "context" "fmt" "reflect" + "regexp" "strings" "time" @@ -11,71 +15,277 @@ import ( "github.com/ramendr/ramen/internal/controller/kubeobjects" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/util/jsonpath" "sigs.k8s.io/controller-runtime/pkg/client" ) const ( defaultTimeoutValue = 300 - pollInterval = 100 expectedNumberOfJSONPaths = 2 + podType = "pod" + deploymentType = "deployment" + statefulsetType = "statefulset" + pInterval = 100 ) -func getJSONObject(k8sClient client.Client, hook *kubeobjects.HookSpec) (client.Object, error) { - var obj client.Object - - switch hook.SelectResource { - case "pod": - obj = &corev1.Pod{} - case "deployment": - obj = &appsv1.Deployment{} - case "statefulset": - obj = &appsv1.StatefulSet{} - default: - return obj, fmt.Errorf("unsupported resource type %s", hook.SelectResource) +func EvaluateCheckHook(k8sClient client.Client, hook *kubeobjects.HookSpec, log logr.Logger) (bool, error) { + if hook.LabelSelector == nil && hook.NameSelector == "" { + return false, fmt.Errorf("either nameSelector or labelSelector should be provided to get resources") } - err := k8sClient.Get(context.Background(), - types.NamespacedName{Name: hook.NameSelector, Namespace: hook.Namespace}, - obj) - - return obj, err -} - -func EvaluateCheckHook(k8sClient client.Client, hook *kubeobjects.HookSpec, log logr.Logger) (bool, error) { timeout := getTimeoutValue(hook) + pollInterval := pInterval * time.Microsecond + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second) defer cancel() - ticker := time.NewTicker(pollInterval * time.Millisecond) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() - for { + for int(pollInterval.Seconds()) < timeout { select { case <-ctx.Done(): return false, fmt.Errorf("timeout waiting for resource %s to be ready: %w", hook.NameSelector, ctx.Err()) case <-ticker.C: - obj, err := getJSONObject(k8sClient, hook) + objs, err := getResourcesList(k8sClient, hook) if err != nil { - if k8serrors.IsNotFound(err) { - // Resource not found, continue polling - continue - } - return false, err // Some other error occurred, return it } - res, err := EvaluateCheckHookExp(hook.Chk.Condition, obj) - if err != nil { - continue // This may mean that expression is not evaluated + if len(objs) == 0 { + pollInterval *= 2 + ticker.Reset(pollInterval) + + continue } - return res, nil + return EvaluateCheckHookForObjects(objs, hook, log) + } + } + + return false, nil +} + +func EvaluateCheckHookForObjects(objs []client.Object, hook *kubeobjects.HookSpec, log logr.Logger) (bool, error) { + finalRes := true + + var err error + + for _, obj := range objs { + res, err := EvaluateCheckHookExp(hook.Chk.Condition, obj) + finalRes = finalRes && res + + if err != nil { + log.Info("error executing check hook", "for", hook.Name, "with error", err) + + return false, err + } + + log.Info("check hook executed for", "hook", hook.Name, "resource type", hook.SelectResource, "with object name", + obj.GetName(), "in ns", obj.GetNamespace(), "with execution result", res) + } + + return finalRes, err +} + +func getResourcesList(k8sClient client.Client, hook *kubeobjects.HookSpec) ([]client.Object, error) { + resourceList := make([]client.Object, 0) + + var objList client.ObjectList + + switch hook.SelectResource { + case podType: + objList = &corev1.PodList{} + case deploymentType: + objList = &appsv1.DeploymentList{} + case statefulsetType: + objList = &appsv1.StatefulSetList{} + default: + return resourceList, fmt.Errorf("unsupported resource type %s", hook.SelectResource) + } + + if hook.NameSelector != "" { + objsUsingNameSelector, err := getResourcesUsingNameSelector(k8sClient, hook, objList) + if err != nil { + return resourceList, err + } + + resourceList = append(resourceList, objsUsingNameSelector...) + } + + if hook.LabelSelector != nil { + objsUsingLabelSelector, err := getResourcesUsingLabelSelector(k8sClient, hook, objList) + if err != nil { + return resourceList, err + } + + resourceList = append(resourceList, objsUsingLabelSelector...) + } + + return resourceList, nil +} + +func getResourcesUsingLabelSelector(c client.Client, hook *kubeobjects.HookSpec, + objList client.ObjectList, +) ([]client.Object, error) { + filteredObjs := make([]client.Object, 0) + + selector, err := metav1.LabelSelectorAsSelector(hook.LabelSelector) + if err != nil { + return filteredObjs, fmt.Errorf("error during labelSelector to selector conversion") + } + + listOps := &client.ListOptions{ + Namespace: hook.Namespace, + LabelSelector: selector, + } + + err = c.List(context.Background(), objList, listOps) + if err != nil { + return filteredObjs, err + } + + return getObjectsBasedOnType(objList), nil +} + +func getResourcesUsingNameSelector(c client.Client, hook *kubeobjects.HookSpec, + objList client.ObjectList, +) ([]client.Object, error) { + filteredObjs := make([]client.Object, 0) + + var err error + + if isValidK8sName(hook.NameSelector) { + // use nameSelector for Matching field + listOps := &client.ListOptions{ + Namespace: hook.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": hook.NameSelector, // needs exact matching with the name + }), + } + + err = c.List(context.Background(), objList, listOps) + if err != nil { + return filteredObjs, err + } + + return getObjectsBasedOnType(objList), nil + } else if isValidRegex(hook.NameSelector) { + // after listing without the fields selector, match with the regex for filtering + listOps := &client.ListOptions{ + Namespace: hook.Namespace, + } + + re, err := regexp.Compile(hook.NameSelector) + if err != nil { + return filteredObjs, err + } + + err = c.List(context.Background(), objList, listOps) + if err != nil { + return filteredObjs, err + } + + return getObjectsBasedOnTypeAndRegex(objList, re), nil + } + + return filteredObjs, fmt.Errorf("nameSelector is neither distinct name nor regex") +} + +// Based on the type of resource, slice of objects is returned. +func getObjectsBasedOnType(objList client.ObjectList) []client.Object { + objs := make([]client.Object, 0) + + switch v := objList.(type) { + case *corev1.PodList: + for _, pod := range v.Items { + podCopy := pod + objs = append(objs, &podCopy) + } + case *appsv1.DeploymentList: + for _, dep := range v.Items { + depCopy := dep + objs = append(objs, &depCopy) + } + case *appsv1.StatefulSetList: + for _, ss := range v.Items { + ssCopy := ss + objs = append(objs, &ssCopy) } } + + return objs +} + +// Based on the type of resource and regex match, slice of objects is returned. +func getObjectsBasedOnTypeAndRegex(objList client.ObjectList, re *regexp.Regexp) []client.Object { + objs := make([]client.Object, 0) + + switch v := objList.(type) { + case *corev1.PodList: + objs = getMatchingPods(v, re) + case *appsv1.DeploymentList: + objs = getMatchingDeployments(v, re) + case *appsv1.StatefulSetList: + objs = getMatchingStatefulSets(v, re) + } + + return objs +} + +func getMatchingPods(pList *corev1.PodList, re *regexp.Regexp) []client.Object { + objs := make([]client.Object, 0) + + for _, pod := range pList.Items { + if re.MatchString(pod.Name) { + podCopy := pod + objs = append(objs, &podCopy) + } + } + + return objs +} + +func getMatchingDeployments(dList *appsv1.DeploymentList, re *regexp.Regexp) []client.Object { + objs := make([]client.Object, 0) + + for _, pod := range dList.Items { + if re.MatchString(pod.Name) { + podCopy := pod + objs = append(objs, &podCopy) + } + } + + return objs +} + +func getMatchingStatefulSets(ssList *appsv1.StatefulSetList, re *regexp.Regexp) []client.Object { + objs := make([]client.Object, 0) + + for _, pod := range ssList.Items { + if re.MatchString(pod.Name) { + podCopy := pod + objs = append(objs, &podCopy) + } + } + + return objs +} + +func isValidK8sName(nameSelector string) bool { + regex := `^[a-z0-9]([a-z0-9.-]*[a-z0-9])?$` + re := regexp.MustCompile(regex) + + return re.MatchString(nameSelector) +} + +func isValidRegex(nameSelector string) bool { + _, err := regexp.Compile(nameSelector) + + return err == nil } func getTimeoutValue(hook *kubeobjects.HookSpec) int { diff --git a/internal/controller/util/json_util_test.go b/internal/controller/util/json_util_test.go index 3a481d361..166fb8c58 100644 --- a/internal/controller/util/json_util_test.go +++ b/internal/controller/util/json_util_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: The RamenDR authors +// SPDX-License-Identifier: Apache-2.0 + package util_test import ( @@ -11,9 +14,10 @@ import ( type testCases struct { jsonPathExprs string result bool + jsonText []byte } -var jsonText1 = []byte(`{ +var jsonDeployment = []byte(`{ "kind": "Deployment", "spec": { "progressDeadlineSeconds": 600, @@ -35,31 +39,80 @@ var jsonText1 = []byte(`{ } }`) +var jsonPod = []byte(`{ + "kind": "Pod", + "spec": { + "progressDeadlineSeconds": 600, + "replicas": 1, + "revisionHistoryLimit": 10 + }, + "status": { + "replicas": 1, + "conditions": [ + { + "status": "True", + "type": "Progressing" + }, + { + "status": "True", + "type": "Available" + } + ] + } + }`) + +var jsonStatefulset = []byte(`{ + "kind": "Statefulset", + "spec": { + "progressDeadlineSeconds": 600, + "replicas": 1, + "revisionHistoryLimit": 10 + }, + "status": { + "replicas": 1, + "conditions": [ + { + "status": "True", + "type": "Progressing" + }, + { + "status": "True", + "type": "Available" + } + ] + } + }`) + var testCasesData = []testCases{ { jsonPathExprs: "{$.status.conditions[0].status} == True", result: true, + jsonText: jsonDeployment, }, { jsonPathExprs: "{$.spec.replicas} == 1", result: true, + jsonText: jsonPod, }, { jsonPathExprs: "{$.status.conditions[0].status} == {True}", result: false, + jsonText: jsonStatefulset, }, } -func TestXYZ(t *testing.T) { +func TestEvaluateCheckHookExp(t *testing.T) { for i, tt := range testCasesData { test := tt t.Run(strconv.Itoa(i), func(t *testing.T) { var jsonData map[string]interface{} - err := json.Unmarshal(jsonText1, &jsonData) + + err := json.Unmarshal(test.jsonText, &jsonData) if err != nil { t.Error(err) } + _, err = util.EvaluateCheckHookExp(test.jsonPathExprs, jsonData) if (err == nil) != test.result { t.Errorf("EvaluateCheckHookExp() = %v, want %v", err, test.result) diff --git a/internal/controller/volsync/vshandler.go b/internal/controller/volsync/vshandler.go index 9e4b2f1a8..03b1f9b1e 100644 --- a/internal/controller/volsync/vshandler.go +++ b/internal/controller/volsync/vshandler.go @@ -459,6 +459,7 @@ func (v *VSHandler) createOrUpdateRS(rsSpec ramendrv1alpha1.VolSyncReplicationSo return err } + rs.Spec.Trigger = &volsyncv1alpha1.ReplicationSourceTriggerSpec{ Schedule: scheduleCronSpec, } @@ -1181,6 +1182,7 @@ func (v *VSHandler) ensurePVCFromSnapshot(rdSpec ramendrv1alpha1.VolSyncReplicat return nil } + if pvc.Status.Phase == corev1.ClaimBound { // PVC already bound at this point l.V(1).Info("PVC already bound") @@ -1367,6 +1369,7 @@ func (v *VSHandler) ModifyRSSpecForCephFS(rsSpec *ramendrv1alpha1.VolSyncReplica readOnlyPVCStorageClass.Provisioner = storageClass.Provisioner // Copy other parameters from the original storage class + // Note - not copying volumebindingmode or reclaim policy from the source storageclass will leave defaults readOnlyPVCStorageClass.Parameters = map[string]string{} for k, v := range storageClass.Parameters { readOnlyPVCStorageClass.Parameters[k] = v @@ -1374,8 +1377,6 @@ func (v *VSHandler) ModifyRSSpecForCephFS(rsSpec *ramendrv1alpha1.VolSyncReplica // Set backingSnapshot parameter to true readOnlyPVCStorageClass.Parameters["backingSnapshot"] = "true" - - // Note - not copying volumebindingmode or reclaim policy from the source storageclass will leave defaults } return nil diff --git a/internal/controller/vrg_kubeobjects.go b/internal/controller/vrg_kubeobjects.go index 5029943b2..43722bd57 100644 --- a/internal/controller/vrg_kubeobjects.go +++ b/internal/controller/vrg_kubeobjects.go @@ -260,6 +260,7 @@ func (v *VRGInstance) kubeObjectsCaptureStartOrResume( captureInProgressStatusUpdate, labels, annotations, requests, log, ) + requestsProcessedCount += len(v.s3StoreAccessors) if requestsCompletedCount < requestsProcessedCount { log1.Info("Kube objects group capturing", "complete", requestsCompletedCount, "total", requestsProcessedCount) @@ -354,6 +355,7 @@ func (v *VRGInstance) kubeObjectsGroupCapture( if err == nil { log1.Info("Kube objects group captured", "start", request.StartTime(), "end", request.EndTime()) + requestsCompletedCount++ continue @@ -493,76 +495,60 @@ func (v *VRGInstance) kubeObjectsCaptureStatus(status metav1.ConditionStatus, re } } -func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result, - s3StoreProfile ramen.S3StoreProfile, objectStorer ObjectStorer, -) error { - if v.kubeObjectProtectionDisabled("recovery") { - return nil - } +func (v *VRGInstance) getVRGFromS3Profile(s3ProfileName string) (*ramen.VolumeReplicationGroup, error) { + pathName := s3PathNamePrefix(v.instance.Namespace, v.instance.Name) - localS3StoreAccessor, err := v.findS3StoreAccessor(s3StoreProfile) + objectStore, _, err := v.reconciler.ObjStoreGetter.ObjectStore( + v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log) if err != nil { - return err + return nil, fmt.Errorf("object store inaccessible for profile %v: %v", s3ProfileName, err) } - vrg := v.instance - sourceVrgNamespaceName, sourceVrgName := vrg.Namespace, vrg.Name - sourcePathNamePrefix := s3PathNamePrefix(sourceVrgNamespaceName, sourceVrgName) + vrg := &ramen.VolumeReplicationGroup{} + if err := vrgObjectDownload(objectStore, pathName, vrg); err != nil { + return nil, fmt.Errorf("vrg download failed, vrg namespace:%v, vrg name: %v, s3Profile: %v, error: %v", + v.instance.Namespace, v.instance.Name, s3ProfileName, err) + } - sourceVrg := &ramen.VolumeReplicationGroup{} - if err := vrgObjectDownload(objectStorer, sourcePathNamePrefix, sourceVrg); err != nil { - v.log.Error(err, "Kube objects capture-to-recover-from identifier get error") + return vrg, nil +} +func (v *VRGInstance) kubeObjectsRecover(result *ctrl.Result, s3ProfileName string) error { + if v.kubeObjectProtectionDisabled("recovery") { return nil } - captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom - if captureToRecoverFromIdentifier == nil { - v.log.Info("Kube objects capture-to-recover-from identifier nil") + if v.instance.Spec.Action == "" { + v.log.Info("Skipping kube objects restore in fresh deployment case") return nil } - vrg.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier - veleroNamespaceName := v.veleroNamespaceName() - labels := util.OwnerLabels(vrg) - log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number, "profile", localS3StoreAccessor.S3ProfileName) - - captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet( - v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels) + sourceVrg, err := v.getVRGFromS3Profile(s3ProfileName) if err != nil { - log.Error(err, "Kube objects capture requests query error") - - return err + return fmt.Errorf("kube objects source VRG get error: %v", err) } - recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet( - v.ctx, v.reconciler.APIReader, veleroNamespaceName, labels) - if err != nil { - log.Error(err, "Kube objects recover requests query error") - - return err + captureToRecoverFromIdentifier := sourceVrg.Status.KubeObjectProtection.CaptureToRecoverFrom + if captureToRecoverFromIdentifier == nil { + return fmt.Errorf("kube objects source VRG capture-to-recover-from identifier nil: %v", err) } - return v.kubeObjectsRecoveryStartOrResume( - result, - s3StoreAccessor{objectStorer, localS3StoreAccessor.S3StoreProfile}, - sourceVrgNamespaceName, sourceVrgName, captureToRecoverFromIdentifier, - kubeobjects.RequestsMapKeyedByName(captureRequestsStruct), - kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct), - veleroNamespaceName, labels, log, - ) + v.instance.Status.KubeObjectProtection.CaptureToRecoverFrom = captureToRecoverFromIdentifier + log := v.log.WithValues("number", captureToRecoverFromIdentifier.Number, "profile", s3ProfileName) + + return v.kubeObjectsRecoveryStartOrResume(result, s3ProfileName, captureToRecoverFromIdentifier, log) } -func (v *VRGInstance) findS3StoreAccessor(s3StoreProfile ramen.S3StoreProfile) (s3StoreAccessor, error) { +func (v *VRGInstance) findS3StoreAccessor(s3ProfileName string) (s3StoreAccessor, error) { for _, s3StoreAccessor := range v.s3StoreAccessors { - if s3StoreAccessor.S3StoreProfile.S3ProfileName == s3StoreProfile.S3ProfileName { + if s3StoreAccessor.S3StoreProfile.S3ProfileName == s3ProfileName { return s3StoreAccessor, nil } } return s3StoreAccessor{}, - fmt.Errorf("s3StoreProfile (%s) not found in s3StoreAccessor list", s3StoreProfile.S3ProfileName) + fmt.Errorf("s3StoreProfile (%s) not found in s3StoreAccessor list", s3ProfileName) } func (v *VRGInstance) getRecoverOrProtectRequest( @@ -570,7 +556,7 @@ func (v *VRGInstance) getRecoverOrProtectRequest( s3StoreAccessor s3StoreAccessor, sourceVrgNamespaceName, sourceVrgName string, captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier, groupNumber int, recoverGroup kubeobjects.RecoverSpec, - veleroNamespaceName string, labels map[string]string, log logr.Logger, + labels map[string]string, log logr.Logger, ) (kubeobjects.Request, bool, func() (kubeobjects.Request, error), func(kubeobjects.Request)) { vrg := v.instance annotations := map[string]string{} @@ -589,7 +575,7 @@ func (v *VRGInstance) getRecoverOrProtectRequest( s3StoreAccessor.S3CompatibleEndpoint, s3StoreAccessor.S3Bucket, s3StoreAccessor.S3Region, pathName, s3StoreAccessor.VeleroNamespaceSecretKeyRef, s3StoreAccessor.CACertificates, - recoverGroup.Spec, veleroNamespaceName, + recoverGroup.Spec, v.veleroNamespaceName(), captureName, labels, annotations) }, @@ -613,7 +599,7 @@ func (v *VRGInstance) getRecoverOrProtectRequest( s3StoreAccessor.S3CompatibleEndpoint, s3StoreAccessor.S3Bucket, s3StoreAccessor.S3Region, pathName, s3StoreAccessor.VeleroNamespaceSecretKeyRef, s3StoreAccessor.CACertificates, - recoverGroup, veleroNamespaceName, + recoverGroup, v.veleroNamespaceName(), captureName, captureRequest, recoverName, labels, annotations) @@ -625,16 +611,51 @@ func (v *VRGInstance) getRecoverOrProtectRequest( } } +func (v *VRGInstance) getCaptureRequests() (map[string]kubeobjects.Request, error) { + captureRequestsStruct, err := v.reconciler.kubeObjects.ProtectRequestsGet( + v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance)) + if err != nil { + return nil, fmt.Errorf("kube objects capture requests query error: %v", err) + } + + return kubeobjects.RequestsMapKeyedByName(captureRequestsStruct), nil +} + +func (v *VRGInstance) getRecoverRequests() (map[string]kubeobjects.Request, error) { + recoverRequestsStruct, err := v.reconciler.kubeObjects.RecoverRequestsGet( + v.ctx, v.reconciler.APIReader, v.veleroNamespaceName(), util.OwnerLabels(v.instance)) + if err != nil { + return nil, fmt.Errorf("kube objects recover requests query error: %v", err) + } + + return kubeobjects.RequestsMapKeyedByName(recoverRequestsStruct), nil +} + func (v *VRGInstance) kubeObjectsRecoveryStartOrResume( - result *ctrl.Result, s3StoreAccessor s3StoreAccessor, - sourceVrgNamespaceName, sourceVrgName string, + result *ctrl.Result, s3ProfileName string, captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier, - captureRequests, recoverRequests map[string]kubeobjects.Request, - veleroNamespaceName string, labels map[string]string, log logr.Logger, + log logr.Logger, ) error { + labels := util.OwnerLabels(v.instance) + + captureRequests, err := v.getCaptureRequests() + if err != nil { + return err + } + + recoverRequests, err := v.getRecoverRequests() + if err != nil { + return err + } + groups := v.recipeElements.RecoverWorkflow requests := make([]kubeobjects.Request, len(groups)) + s3StoreAccessor, err := v.findS3StoreAccessor(s3ProfileName) + if err != nil { + return fmt.Errorf("kube objects recovery couldn't build s3StoreAccessor: %v", err) + } + for groupNumber, recoverGroup := range groups { rg := recoverGroup log1 := log.WithValues("group", groupNumber, "name", rg.BackupName) @@ -644,9 +665,9 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume( return fmt.Errorf("check hook execution failed during restore %s: %v", rg.Hook.Name, err) } } else { - if err := v.executeRecoverGroup(result, s3StoreAccessor, sourceVrgNamespaceName, - sourceVrgName, captureToRecoverFromIdentifier, captureRequests, - recoverRequests, veleroNamespaceName, labels, groupNumber, rg, + if err := v.executeRecoverGroup(result, s3StoreAccessor, + captureToRecoverFromIdentifier, captureRequests, + recoverRequests, labels, groupNumber, rg, requests, log1); err != nil { return err } @@ -657,21 +678,22 @@ func (v *VRGInstance) kubeObjectsRecoveryStartOrResume( duration := time.Since(startTime.Time) log.Info("Kube objects recovered", "groups", len(groups), "start", startTime, "duration", duration) - return v.kubeObjectsRecoverRequestsDelete(result, veleroNamespaceName, labels) + return v.kubeObjectsRecoverRequestsDelete(result, v.veleroNamespaceName(), labels) } func (v *VRGInstance) executeRecoverGroup(result *ctrl.Result, s3StoreAccessor s3StoreAccessor, - sourceVrgNamespaceName, sourceVrgName string, captureToRecoverFromIdentifier *ramen.KubeObjectsCaptureIdentifier, captureRequests, recoverRequests map[string]kubeobjects.Request, - veleroNamespaceName string, labels map[string]string, groupNumber int, + labels map[string]string, groupNumber int, rg kubeobjects.RecoverSpec, requests []kubeobjects.Request, log1 logr.Logger, ) error { + sourceVrgName := v.instance.Name + sourceVrgNamespaceName := v.instance.Namespace request, ok, submit, cleanup := v.getRecoverOrProtectRequest( captureRequests, recoverRequests, s3StoreAccessor, sourceVrgNamespaceName, sourceVrgName, captureToRecoverFromIdentifier, - groupNumber, rg, veleroNamespaceName, labels, log1, + groupNumber, rg, labels, log1, ) var err error diff --git a/internal/controller/vrg_volrep.go b/internal/controller/vrg_volrep.go index 3ebd81e1b..701c5c212 100644 --- a/internal/controller/vrg_volrep.go +++ b/internal/controller/vrg_volrep.go @@ -892,6 +892,7 @@ func (v *VRGInstance) reconcileVRForDeletion(pvc *corev1.PersistentVolumeClaim, } } else { requeueResult, ready, err := v.processVRAsPrimary(pvcNamespacedName, pvc, log) + switch { case err != nil: log.Info("Requeuing due to failure in getting or creating VolumeReplication resource for PersistentVolumeClaim", @@ -2009,9 +2010,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error) var objectStore ObjectStorer - var s3StoreProfile ramendrv1alpha1.S3StoreProfile - - objectStore, s3StoreProfile, err = v.reconciler.ObjStoreGetter.ObjectStore( + objectStore, _, err = v.reconciler.ObjStoreGetter.ObjectStore( v.ctx, v.reconciler.APIReader, s3ProfileName, v.namespacedName, v.log) if err != nil { v.log.Error(err, "Kube objects recovery object store inaccessible", "profile", s3ProfileName) @@ -2045,7 +2044,7 @@ func (v *VRGInstance) restorePVsAndPVCsFromS3(result *ctrl.Result) (int, error) v.log.Info(fmt.Sprintf("Restored %d PVs and %d PVCs using profile %s", pvCount, pvcCount, s3ProfileName)) - return pvCount + pvcCount, v.kubeObjectsRecover(result, s3StoreProfile, objectStore) + return pvCount + pvcCount, v.kubeObjectsRecover(result, s3ProfileName) } if NoS3 { diff --git a/internal/controller/vrg_volrep_test.go b/internal/controller/vrg_volrep_test.go index 289f6386d..fbb2df47d 100644 --- a/internal/controller/vrg_volrep_test.go +++ b/internal/controller/vrg_volrep_test.go @@ -2638,6 +2638,7 @@ func (v *vrgTest) promoteVolRepsAndDo(options promoteOptions, do func(int, int)) if !options.ValidatedMissing { v.waitForVolRepCondition(volrepKey, volrep.ConditionValidated, metav1.ConditionTrue) } + v.waitForVolRepCondition(volrepKey, volrep.ConditionCompleted, metav1.ConditionTrue) v.waitForProtectedPVCs(volrepKey) }