Skip to content

Commit

Permalink
Fix syncing the aggregated snapshot size between CNSVolumeInfo and CN…
Browse files Browse the repository at this point in the history
…S query result

Signed-off-by: Deepak Kinni <[email protected]>
  • Loading branch information
deepakkinni committed Sep 30, 2024
1 parent f7c56ae commit f35c78d
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 126 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch/v5 v5.6.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-logr/zapr v1.2.4
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.6.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.35.24/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -675,12 +676,16 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 h1:+FNtrFTmVw0YZGpBGX56XDee331t6JAXeK2bcyhLOOc=
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0Hg7FvpRQsQh5OSqIylirxKC7o=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/cns-lib/vsphere/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func GetCandidateDatastoresInCluster(ctx context.Context, vc *VirtualCenter, clu
if len(sharedDatastores) == 0 && len(vsanDirectDatastores) == 0 {
return nil, nil, fmt.Errorf("no candidates datastores found in the Kubernetes cluster")
}
log.Infof("Found shared datastores: %+v and vSAN Direct datastores: %+v", sharedDatastores,
log.Debugf("Found shared datastores: %+v and vSAN Direct datastores: %+v", sharedDatastores,
vsanDirectDatastores)
return sharedDatastores, vsanDirectDatastores, nil
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func GetKubeConfig(ctx context.Context) (*restclient.Config, error) {
var err error
kubecfgPath := getKubeConfigPath(ctx)
if kubecfgPath != "" {
log.Infof("k8s client using kubeconfig from %s", kubecfgPath)
log.Debugf("k8s client using kubeconfig from %s", kubecfgPath)
config, err = clientcmd.BuildConfigFromFlags("", kubecfgPath)
if err != nil {
log.Errorf("BuildConfigFromFlags failed %v", err)
return nil, err
}
} else {
log.Info("k8s client using in-cluster config")
log.Debug("k8s client using in-cluster config")
config, err = restclient.InClusterConfig()
if err != nil {
log.Errorf("InClusterConfig failed %v", err)
Expand All @@ -102,24 +102,24 @@ func getKubeConfigPath(ctx context.Context) string {
flagValue := kubecfgFlag.Value.String()
if flagValue != "" {
kubecfgPath = flagValue
log.Infof("Kubeconfig path obtained from kubeconfig flag: %q", kubecfgPath)
log.Debugf("Kubeconfig path obtained from kubeconfig flag: %q", kubecfgPath)
} else {
log.Info("Kubeconfig flag is set but empty, checking environment variable value")
log.Debug("Kubeconfig flag is set but empty, checking environment variable value")
}
} else {
log.Info("Kubeconfig flag not set, checking environment variable value")
log.Debug("Kubeconfig flag not set, checking environment variable value")
}
if kubecfgPath == "" {
// Get the Kubeconfig path from the environment variable
kubecfgPath = os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
log.Infof("Kubeconfig path obtained from environment variable %q: %q",
log.Debugf("Kubeconfig path obtained from environment variable %q: %q",
clientcmd.RecommendedConfigPathEnvVar, kubecfgPath)
}
// Final logging of the Kubeconfig path used
if kubecfgPath == "" {
log.Info("No Kubeconfig path found, either from environment variable or flag")
log.Debug("No Kubeconfig path found, either from environment variable or flag")
} else {
log.Infof("Final Kubeconfig path used: %q", kubecfgPath)
log.Debugf("Final Kubeconfig path used: %q", kubecfgPath)
}
return kubecfgPath
}
Expand Down Expand Up @@ -431,7 +431,7 @@ func getClientThroughput(ctx context.Context, isSupervisorClient bool) (float32,
burst = value
}
}
log.Infof("Setting client QPS to %f and Burst to %d.", qps, burst)
log.Debugf("Setting client QPS to %f and Burst to %d.", qps, burst)
return qps, burst
}

Expand Down
111 changes: 65 additions & 46 deletions pkg/syncer/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -314,12 +315,7 @@ func CsiFullSync(ctx context.Context, metadataSyncer *metadataSyncInformer, vc s
cnsVolumeMap[vol.VolumeId.Id] = vol
}
log.Infof("calling validateAndCorrectVolumeInfoSnapshotDetails with %d volumes", len(cnsVolumeMap))
err = validateAndCorrectVolumeInfoSnapshotDetails(ctx, cnsVolumeMap)
if err != nil {
log.Errorf("FullSync for VC %s: Error while sync CNSVolumeinfo snapshot details, failed with err=%+v",
vc, err.Error())
return err
}
validateAndCorrectVolumeInfoSnapshotDetails(ctx, cnsVolumeMap)
}
vcHostObj, vcHostObjFound := metadataSyncer.configInfo.Cfg.VirtualCenter[vc]
if !vcHostObjFound {
Expand Down Expand Up @@ -767,9 +763,11 @@ func volumeInfoCRFullSync(ctx context.Context, metadataSyncer *metadataSyncInfor
// in cns with the size in cnsvolumeinfo. if found discrepancy in order to correct the values
// update the cnsvolumeinfo.
func validateAndCorrectVolumeInfoSnapshotDetails(ctx context.Context,
cnsVolumeMap map[string]cnstypes.CnsVolume) error {
cnsVolumeMap map[string]cnstypes.CnsVolume) {
log := logger.GetLogger(ctx)
volumeInfoCRList := volumeInfoService.ListAllVolumeInfos()
alreadySyncedVolumeCount := 0
outOfSyncVolumeCount := 0
for _, volumeInfo := range volumeInfoCRList {
cnsvolumeinfo := &cnsvolumeinfov1alpha1.CNSVolumeInfo{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(volumeInfo.(*unstructured.Unstructured).Object,
Expand All @@ -779,54 +777,75 @@ func validateAndCorrectVolumeInfoSnapshotDetails(ctx context.Context,
continue
}
if cnsVol, ok := cnsVolumeMap[cnsvolumeinfo.Spec.VolumeID]; ok {
log.Infof("validate volume info for storage details for volume %s", cnsVol.VolumeId.Id)
var aggregatedSnapshotCapacity int64
log.Debugf("validate volume info for storage details for volume %s", cnsVol.VolumeId.Id)
var aggregatedSnapshotCapacityInMB int64
if cnsVol.BackingObjectDetails != nil &&
cnsVol.BackingObjectDetails.(*cnstypes.CnsBlockBackingDetails) != nil {
val, ok := cnsVol.BackingObjectDetails.(*cnstypes.CnsBlockBackingDetails)
if ok {
aggregatedSnapshotCapacity = val.AggregatedSnapshotCapacityInMb
aggregatedSnapshotCapacityInMB = val.AggregatedSnapshotCapacityInMb
}
log.Infof("Received aggregatedSnapshotCapacity %d for volume %q",
aggregatedSnapshotCapacity, cnsVol.VolumeId.Id)
if cnsvolumeinfo.Spec.AggregatedSnapshotSize == nil || aggregatedSnapshotCapacity !=
cnsvolumeinfo.Spec.AggregatedSnapshotSize.Value() {
// use current time as snapshot completion time is not available in fullsync.
log.Infof("Update aggregatedSnapshotCapacity for volume %q", cnsVol.VolumeId.Id)
currentTime := time.Now()
cnsSnapInfo := &volumes.CnsSnapshotInfo{
SourceVolumeID: cnsvolumeinfo.Spec.VolumeID,
SnapshotLatestOperationCompleteTime: time.Now(),
AggregatedSnapshotCapacityInMb: aggregatedSnapshotCapacity,
}
log.Infof("unable to get snapshot operation completion time for volumeID %q "+
"will use current time %v instead", cnsvolumeinfo.Spec.VolumeID, currentTime)
patch, err := common.GetValidatedCNSVolumeInfoPatch(ctx, cnsSnapInfo)
if err != nil {
log.Errorf("unable to get VolumeInfo patch for %q. Error: %+v",
cnsvolumeinfo.Spec.VolumeID, err)
return err
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Errorf("error while create VolumeInfo patch for volume %q. Error while marshaling: %+v",
cnsvolumeinfo.Spec.VolumeID, err)
return err
}
err = volumeInfoService.PatchVolumeInfo(ctx, cnsvolumeinfo.Spec.VolumeID, patchBytes,
allowedRetriesToPatchCNSVolumeInfo)
if err != nil {
log.Errorf("failed to patch CNSVolumeInfo instance to update snapshot details."+
"for volume %q. Error: %+v", cnsvolumeinfo.Spec.VolumeID, err)
return err
log.Debugf("Received aggregatedSnapshotCapacity %dMB for volume %s from CNS",
aggregatedSnapshotCapacityInMB, cnsVol.VolumeId.Id)
if cnsvolumeinfo.Spec.AggregatedSnapshotSize != nil && cnsvolumeinfo.Spec.ValidAggregatedSnapshotSize {
aggregatedSnapshotCapacity := resource.NewQuantity(aggregatedSnapshotCapacityInMB*common.MbInBytes,
resource.BinarySI)
if aggregatedSnapshotCapacity.Value() == cnsvolumeinfo.Spec.AggregatedSnapshotSize.Value() {
log.Debugf("volume %s Aggregated Snapshot capacity %s in CnsVolumeInfo is in sync with CNS",
cnsVol.VolumeId.Id, aggregatedSnapshotCapacity.String())
alreadySyncedVolumeCount++
continue
}
log.Infof("Updated CNSvolumeInfo with Snapshot details successfully for volume %q",
cnsvolumeinfo.Spec.VolumeID)
log.Infof("Aggregated Snapshot size mismatch for volume %s, %s in CnsVolumeInfo and %dMB in CNS",
cnsVol.VolumeId.Id, cnsvolumeinfo.Spec.AggregatedSnapshotSize.String(),
aggregatedSnapshotCapacityInMB)
}
// Nothing to do if it's invalid in CNS and CNSVolumeInfo
if !cnsvolumeinfo.Spec.ValidAggregatedSnapshotSize && aggregatedSnapshotCapacityInMB == -1 {
log.Infof("volume %s aggregated snapshot capacity not present in CNS and CNSVolumeInfo",
cnsVol.VolumeId.Id)
continue
}
// implies the cnsvolumeinfo has capacity mismatch with CNS queryVolume result OR
// existing cnsvolumeinfo aggregated snapshot is not set or invalid
log.Infof("Update aggregatedSnapshotCapacity for volume %s to %dMB",
cnsVol.VolumeId.Id, aggregatedSnapshotCapacityInMB)
// use current time as snapshot completion time is not available in fullsync.
currentTime := time.Now()
cnsSnapInfo := &volumes.CnsSnapshotInfo{
SourceVolumeID: cnsvolumeinfo.Spec.VolumeID,
SnapshotLatestOperationCompleteTime: time.Now(),
AggregatedSnapshotCapacityInMb: aggregatedSnapshotCapacityInMB,
}
log.Infof("snapshot operation completion time unavailable for volumeID %s, will"+
" use current time %v instead", cnsvolumeinfo.Spec.VolumeID, currentTime.String())
patch, err := common.GetValidatedCNSVolumeInfoPatch(ctx, cnsSnapInfo)
if err != nil {
log.Errorf("unable to get VolumeInfo patch for %s. Error: %+v. Continuing..",
cnsvolumeinfo.Spec.VolumeID, err)
continue
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Errorf("error while create VolumeInfo patch for volume %s. Error while marshaling: %+v. Continuing..",
cnsvolumeinfo.Spec.VolumeID, err)
continue
}
err = volumeInfoService.PatchVolumeInfo(ctx, cnsvolumeinfo.Spec.VolumeID, patchBytes,
allowedRetriesToPatchCNSVolumeInfo)
if err != nil {
log.Errorf("failed to patch CNSVolumeInfo instance to update snapshot details."+
"for volume %s. Error: %+v. Continuing..", cnsvolumeinfo.Spec.VolumeID, err)
continue
}
log.Infof("Updated CNSvolumeInfo with Snapshot details successfully for volume %s",
cnsvolumeinfo.Spec.VolumeID)
outOfSyncVolumeCount++
}
}
}
return nil
log.Infof("Number of volumes with synced aggregated snapshot size with CNS %d", alreadySyncedVolumeCount)
log.Infof("Number of volumes with out-of-sync aggregated snapshot size with CNS %d", outOfSyncVolumeCount)
}

// fullSyncCreateVolumes creates volumes with given array of createSpec.
Expand Down Expand Up @@ -1214,7 +1233,7 @@ func fullSyncGetVolumeSpecs(ctx context.Context, vCenterVersion string, pvList [
log.Infof("FullSync for VC %s: update is required for volume: %q", vc, volumeHandle)
operationType = "updateVolume"
} else {
log.Infof("FullSync for VC %s: update is not required for volume: %q", vc, volumeHandle)
log.Debugf("FullSync for VC %s: update is not required for volume: %q", vc, volumeHandle)
}
}
switch operationType {
Expand Down
Loading

0 comments on commit f35c78d

Please sign in to comment.