Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix syncing the aggregated snapshot size between CNSVolumeInfo and CNS query result #3073

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch/v5 v5.6.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-logr/zapr v1.2.4
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.6.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.35.24/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -675,12 +676,16 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 h1:+FNtrFTmVw0YZGpBGX56XDee331t6JAXeK2bcyhLOOc=
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0Hg7FvpRQsQh5OSqIylirxKC7o=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/cns-lib/vsphere/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func GetCandidateDatastoresInCluster(ctx context.Context, vc *VirtualCenter, clu
if len(sharedDatastores) == 0 && len(vsanDirectDatastores) == 0 {
return nil, nil, fmt.Errorf("no candidates datastores found in the Kubernetes cluster")
}
log.Infof("Found shared datastores: %+v and vSAN Direct datastores: %+v", sharedDatastores,
log.Debugf("Found shared datastores: %+v and vSAN Direct datastores: %+v", sharedDatastores,
vsanDirectDatastores)
return sharedDatastores, vsanDirectDatastores, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/csi/service/common/authmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (authManager *AuthManager) refreshFSEnabledClustersToDsMap() {
defer authManager.rwMutex.Unlock()

authManager.fsEnabledClusterToDsMap = newFsEnabledClusterToDsMap
log.Infof("auth manager: newFsEnabledClusterToDsMap is updated to %v for vCenter %q",
log.Debugf("auth manager: newFsEnabledClusterToDsMap is updated to %v for vCenter %q",
newFsEnabledClusterToDsMap, vcenterHost)
} else {
log.Warnf("auth manager: failed to get updated datastoreMapForFileVolumes for vCenter %q, Err: %v",
Expand Down Expand Up @@ -502,7 +502,7 @@ func getFSEnabledClustersWithPriv(ctx context.Context, vc *cnsvsphere.VirtualCen
log.Debugf("vSAN file service is enabled for cluster: %+v and vCenter: %q.",
cluster, vc.Config.Host)
} else {
log.Infof("vSAN file service is disabled for cluster: %+v and vCenter: %q.",
log.Debugf("vSAN file service is disabled for cluster: %+v and vCenter: %q.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these messages were intentionally made Info messages as part of RCCA of SR. Please confirm with Divyen and keep required messages as Info. Most of other Debug messages look ok to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can refer to https://github.com/kubernetes-sigs/vsphere-csi-driver/pull/2490/files and skip updating messages from this file.

cluster, vc.Config.Host)
}
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func GetKubeConfig(ctx context.Context) (*restclient.Config, error) {
var err error
kubecfgPath := getKubeConfigPath(ctx)
if kubecfgPath != "" {
log.Infof("k8s client using kubeconfig from %s", kubecfgPath)
log.Debugf("k8s client using kubeconfig from %s", kubecfgPath)
config, err = clientcmd.BuildConfigFromFlags("", kubecfgPath)
if err != nil {
log.Errorf("BuildConfigFromFlags failed %v", err)
return nil, err
}
} else {
log.Info("k8s client using in-cluster config")
log.Debug("k8s client using in-cluster config")
config, err = restclient.InClusterConfig()
if err != nil {
log.Errorf("InClusterConfig failed %v", err)
Expand All @@ -102,24 +102,24 @@ func getKubeConfigPath(ctx context.Context) string {
flagValue := kubecfgFlag.Value.String()
if flagValue != "" {
kubecfgPath = flagValue
log.Infof("Kubeconfig path obtained from kubeconfig flag: %q", kubecfgPath)
log.Debugf("Kubeconfig path obtained from kubeconfig flag: %q", kubecfgPath)
} else {
log.Info("Kubeconfig flag is set but empty, checking environment variable value")
log.Debug("Kubeconfig flag is set but empty, checking environment variable value")
}
} else {
log.Info("Kubeconfig flag not set, checking environment variable value")
log.Debug("Kubeconfig flag not set, checking environment variable value")
}
if kubecfgPath == "" {
// Get the Kubeconfig path from the environment variable
kubecfgPath = os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
log.Infof("Kubeconfig path obtained from environment variable %q: %q",
log.Debugf("Kubeconfig path obtained from environment variable %q: %q",
clientcmd.RecommendedConfigPathEnvVar, kubecfgPath)
}
// Final logging of the Kubeconfig path used
if kubecfgPath == "" {
log.Info("No Kubeconfig path found, either from environment variable or flag")
log.Debug("No Kubeconfig path found, either from environment variable or flag")
} else {
log.Infof("Final Kubeconfig path used: %q", kubecfgPath)
log.Debugf("Final Kubeconfig path used: %q", kubecfgPath)
}
return kubecfgPath
}
Expand Down Expand Up @@ -431,7 +431,7 @@ func getClientThroughput(ctx context.Context, isSupervisorClient bool) (float32,
burst = value
}
}
log.Infof("Setting client QPS to %f and Burst to %d.", qps, burst)
log.Debugf("Setting client QPS to %f and Burst to %d.", qps, burst)
return qps, burst
}

Expand Down
112 changes: 66 additions & 46 deletions pkg/syncer/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/davecgh/go-spew/spew"
"github.com/vmware/govmomi/cns"
Expand Down Expand Up @@ -314,12 +316,7 @@ func CsiFullSync(ctx context.Context, metadataSyncer *metadataSyncInformer, vc s
cnsVolumeMap[vol.VolumeId.Id] = vol
}
log.Infof("calling validateAndCorrectVolumeInfoSnapshotDetails with %d volumes", len(cnsVolumeMap))
err = validateAndCorrectVolumeInfoSnapshotDetails(ctx, cnsVolumeMap)
if err != nil {
log.Errorf("FullSync for VC %s: Error while sync CNSVolumeinfo snapshot details, failed with err=%+v",
vc, err.Error())
return err
}
validateAndCorrectVolumeInfoSnapshotDetails(ctx, cnsVolumeMap)
}
vcHostObj, vcHostObjFound := metadataSyncer.configInfo.Cfg.VirtualCenter[vc]
if !vcHostObjFound {
Expand Down Expand Up @@ -767,9 +764,11 @@ func volumeInfoCRFullSync(ctx context.Context, metadataSyncer *metadataSyncInfor
// in cns with the size in cnsvolumeinfo. if found discrepancy in order to correct the values
// update the cnsvolumeinfo.
func validateAndCorrectVolumeInfoSnapshotDetails(ctx context.Context,
cnsVolumeMap map[string]cnstypes.CnsVolume) error {
cnsVolumeMap map[string]cnstypes.CnsVolume) {
log := logger.GetLogger(ctx)
volumeInfoCRList := volumeInfoService.ListAllVolumeInfos()
alreadySyncedVolumeCount := 0
outOfSyncVolumeCount := 0
for _, volumeInfo := range volumeInfoCRList {
cnsvolumeinfo := &cnsvolumeinfov1alpha1.CNSVolumeInfo{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(volumeInfo.(*unstructured.Unstructured).Object,
Expand All @@ -779,54 +778,75 @@ func validateAndCorrectVolumeInfoSnapshotDetails(ctx context.Context,
continue
}
if cnsVol, ok := cnsVolumeMap[cnsvolumeinfo.Spec.VolumeID]; ok {
log.Infof("validate volume info for storage details for volume %s", cnsVol.VolumeId.Id)
var aggregatedSnapshotCapacity int64
log.Debugf("validate volume info for storage details for volume %s", cnsVol.VolumeId.Id)
var aggregatedSnapshotCapacityInMB int64
if cnsVol.BackingObjectDetails != nil &&
cnsVol.BackingObjectDetails.(*cnstypes.CnsBlockBackingDetails) != nil {
val, ok := cnsVol.BackingObjectDetails.(*cnstypes.CnsBlockBackingDetails)
if ok {
aggregatedSnapshotCapacity = val.AggregatedSnapshotCapacityInMb
aggregatedSnapshotCapacityInMB = val.AggregatedSnapshotCapacityInMb
}
log.Infof("Received aggregatedSnapshotCapacity %d for volume %q",
aggregatedSnapshotCapacity, cnsVol.VolumeId.Id)
if cnsvolumeinfo.Spec.AggregatedSnapshotSize == nil || aggregatedSnapshotCapacity !=
cnsvolumeinfo.Spec.AggregatedSnapshotSize.Value() {
// use current time as snapshot completion time is not available in fullsync.
log.Infof("Update aggregatedSnapshotCapacity for volume %q", cnsVol.VolumeId.Id)
currentTime := time.Now()
cnsSnapInfo := &volumes.CnsSnapshotInfo{
SourceVolumeID: cnsvolumeinfo.Spec.VolumeID,
SnapshotLatestOperationCompleteTime: time.Now(),
AggregatedSnapshotCapacityInMb: aggregatedSnapshotCapacity,
}
log.Infof("unable to get snapshot operation completion time for volumeID %q "+
"will use current time %v instead", cnsvolumeinfo.Spec.VolumeID, currentTime)
patch, err := common.GetValidatedCNSVolumeInfoPatch(ctx, cnsSnapInfo)
if err != nil {
log.Errorf("unable to get VolumeInfo patch for %q. Error: %+v",
cnsvolumeinfo.Spec.VolumeID, err)
return err
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Errorf("error while create VolumeInfo patch for volume %q. Error while marshaling: %+v",
cnsvolumeinfo.Spec.VolumeID, err)
return err
}
err = volumeInfoService.PatchVolumeInfo(ctx, cnsvolumeinfo.Spec.VolumeID, patchBytes,
allowedRetriesToPatchCNSVolumeInfo)
if err != nil {
log.Errorf("failed to patch CNSVolumeInfo instance to update snapshot details."+
"for volume %q. Error: %+v", cnsvolumeinfo.Spec.VolumeID, err)
return err
log.Debugf("Received aggregatedSnapshotCapacity %dMB for volume %s from CNS",
aggregatedSnapshotCapacityInMB, cnsVol.VolumeId.Id)
if cnsvolumeinfo.Spec.AggregatedSnapshotSize != nil && cnsvolumeinfo.Spec.ValidAggregatedSnapshotSize {
aggregatedSnapshotCapacity := resource.NewQuantity(aggregatedSnapshotCapacityInMB*common.MbInBytes,
resource.BinarySI)
if aggregatedSnapshotCapacity.Value() == cnsvolumeinfo.Spec.AggregatedSnapshotSize.Value() {
log.Debugf("volume %s Aggregated Snapshot capacity %s in CnsVolumeInfo is in sync with CNS",
cnsVol.VolumeId.Id, aggregatedSnapshotCapacity.String())
alreadySyncedVolumeCount++
continue
}
log.Infof("Updated CNSvolumeInfo with Snapshot details successfully for volume %q",
cnsvolumeinfo.Spec.VolumeID)
log.Infof("Aggregated Snapshot size mismatch for volume %s, %s in CnsVolumeInfo and %dMB in CNS",
cnsVol.VolumeId.Id, cnsvolumeinfo.Spec.AggregatedSnapshotSize.String(),
aggregatedSnapshotCapacityInMB)
}
// Nothing to do if it's invalid in CNS and CNSVolumeInfo
if !cnsvolumeinfo.Spec.ValidAggregatedSnapshotSize && aggregatedSnapshotCapacityInMB == -1 {
log.Infof("volume %s aggregated snapshot capacity not present in CNS and CNSVolumeInfo",
cnsVol.VolumeId.Id)
continue
}
// implies the cnsvolumeinfo has capacity mismatch with CNS queryVolume result OR
// existing cnsvolumeinfo aggregated snapshot is not set or invalid
log.Infof("Update aggregatedSnapshotCapacity for volume %s to %dMB",
cnsVol.VolumeId.Id, aggregatedSnapshotCapacityInMB)
// use current time as snapshot completion time is not available in fullsync.
currentTime := time.Now()
cnsSnapInfo := &volumes.CnsSnapshotInfo{
SourceVolumeID: cnsvolumeinfo.Spec.VolumeID,
SnapshotLatestOperationCompleteTime: time.Now(),
AggregatedSnapshotCapacityInMb: aggregatedSnapshotCapacityInMB,
}
log.Infof("snapshot operation completion time unavailable for volumeID %s, will"+
" use current time %v instead", cnsvolumeinfo.Spec.VolumeID, currentTime.String())
patch, err := common.GetValidatedCNSVolumeInfoPatch(ctx, cnsSnapInfo)
if err != nil {
log.Errorf("unable to get VolumeInfo patch for %s. Error: %+v. Continuing..",
cnsvolumeinfo.Spec.VolumeID, err)
continue
}
patchBytes, err := json.Marshal(patch)
if err != nil {
log.Errorf("error while create VolumeInfo patch for volume %s. Error while marshaling: %+v. Continuing..",
cnsvolumeinfo.Spec.VolumeID, err)
continue
}
err = volumeInfoService.PatchVolumeInfo(ctx, cnsvolumeinfo.Spec.VolumeID, patchBytes,
allowedRetriesToPatchCNSVolumeInfo)
if err != nil {
log.Errorf("failed to patch CNSVolumeInfo instance to update snapshot details."+
"for volume %s. Error: %+v. Continuing..", cnsvolumeinfo.Spec.VolumeID, err)
continue
}
log.Infof("Updated CNSvolumeInfo with Snapshot details successfully for volume %s",
cnsvolumeinfo.Spec.VolumeID)
outOfSyncVolumeCount++
}
}
}
return nil
log.Infof("Number of volumes with synced aggregated snapshot size with CNS %d", alreadySyncedVolumeCount)
log.Infof("Number of volumes with out-of-sync aggregated snapshot size with CNS %d", outOfSyncVolumeCount)
}

// fullSyncCreateVolumes creates volumes with given array of createSpec.
Expand Down Expand Up @@ -1214,7 +1234,7 @@ func fullSyncGetVolumeSpecs(ctx context.Context, vCenterVersion string, pvList [
log.Infof("FullSync for VC %s: update is required for volume: %q", vc, volumeHandle)
operationType = "updateVolume"
} else {
log.Infof("FullSync for VC %s: update is not required for volume: %q", vc, volumeHandle)
log.Debugf("FullSync for VC %s: update is not required for volume: %q", vc, volumeHandle)
}
}
switch operationType {
Expand Down
Loading