Skip to content

Commit

Permalink
Revert "fix getPVWithVolumeID to get PV from cached volume ID to PV m…
Browse files Browse the repository at this point in the history
…ap" (#3025)

This reverts commit 63bf4fc.
  • Loading branch information
divyenpatel authored Sep 3, 2024
1 parent ea2b91c commit e9f8a2f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 87 deletions.
2 changes: 2 additions & 0 deletions pkg/csi/service/wcp/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"

cnsvolume "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume"
cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere"
cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
Expand Down Expand Up @@ -1305,6 +1306,7 @@ func (c *controller) ControllerPublishVolume(ctx context.Context, req *csi.Contr
volumeAttachment)
}
}

vmuuid, err := getVMUUIDFromK8sCloudOperatorService(ctx, req.VolumeId, req.NodeId)
if err != nil {
if e, ok := status.FromError(err); ok {
Expand Down
100 changes: 13 additions & 87 deletions pkg/syncer/k8scloudoperator/k8scloudoperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/davecgh/go-spew/spew"
Expand All @@ -37,8 +36,8 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
api "k8s.io/kubernetes/pkg/apis/core"

"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"

Expand All @@ -60,40 +59,7 @@ const (
)

type k8sCloudOperator struct {
k8sClient clientset.Interface
k8sInformer *k8s.InformerManager
pvLister corelisters.PersistentVolumeLister
volumeIDToPVName *volumeIDToPVNameMap
}

// Map of volume handles to the PV
// Key is the volume handle ID and value is name of the PV
// The methods to add, remove and get entries from the map in a thread safe
// manner are defined.
type volumeIDToPVNameMap struct {
*sync.RWMutex
items map[string]string
}

// Adds an entry to volumeIDToPVNameMap in a thread safe manner.
func (m *volumeIDToPVNameMap) add(volumeHandle, pvName string) {
m.Lock()
defer m.Unlock()
m.items[volumeHandle] = pvName
}

// Removes a volume handle from volumeIDToPVNameMap in a thread safe manner.
func (m *volumeIDToPVNameMap) remove(volumeHandle string) {
m.Lock()
defer m.Unlock()
delete(m.items, volumeHandle)
}

// Returns the pv name corresponding to volumeHandle.
func (m *volumeIDToPVNameMap) get(volumeHandle string) string {
m.RLock()
defer m.RUnlock()
return m.items[volumeHandle]
k8sClient clientset.Interface
}

// initK8sCloudOperatorType initializes the k8sCloudOperator struct.
Expand All @@ -108,48 +74,6 @@ func initK8sCloudOperatorType(ctx context.Context) (*k8sCloudOperator, error) {
log.Errorf("Creating Kubernetes client failed. Err: %v", err)
return nil, err
}
k8sCloudOperator.k8sInformer = k8s.NewInformer(ctx, k8sCloudOperator.k8sClient, true)
k8sCloudOperator.volumeIDToPVName = &volumeIDToPVNameMap{
RWMutex: &sync.RWMutex{},
items: make(map[string]string),
}
err = k8sCloudOperator.k8sInformer.AddPVListener(
ctx,
func(obj interface{}) { // Add.
_, log := logger.GetNewContextWithLogger()
pv, ok := obj.(*v1.PersistentVolume)
if pv == nil || !ok {
log.Warnf("pvAdded: unrecognized object %+v", obj)
return
}

if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csitypes.Name {
k8sCloudOperator.volumeIDToPVName.add(pv.Spec.CSI.VolumeHandle, pv.Name)
log.Debugf("VolumeHandle: %q and PV name: %q is added to volumeIDToPVName",
pv.Spec.CSI.VolumeHandle, pv.Name)
}
},
nil,
func(obj interface{}) { // Delete.
_, log := logger.GetNewContextWithLogger()
pv, ok := obj.(*v1.PersistentVolume)
if pv == nil || !ok {
log.Warnf("PVDeleted: unrecognized object %+v", obj)
return
}
log.Debugf("PV: %s deleted. Removing entry from volumeIDToPvcMap", pv.Name)

if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csitypes.Name {
k8sCloudOperator.volumeIDToPVName.remove(pv.Spec.CSI.VolumeHandle)
log.Debugf("VolumeHandle: %q and PV name: %q is removed from volumeIDToPVName",
pv.Spec.CSI.VolumeHandle, pv.Name)
}
})
if err != nil {
log.Errorf("failed to create PV Listener. Err: %v", err)
return nil, err
}
k8sCloudOperator.pvLister = k8sCloudOperator.k8sInformer.GetPVLister()
return &k8sCloudOperator, nil
}

Expand Down Expand Up @@ -316,16 +240,18 @@ func getPodPollIntervalInSecs(ctx context.Context) int {
func (k8sCloudOperator *k8sCloudOperator) getPVWithVolumeID(ctx context.Context,
volumeID string) (*v1.PersistentVolume, error) {
log := logger.GetLogger(ctx)
pvName := k8sCloudOperator.volumeIDToPVName.get(volumeID)
if pvName != "" {
log.Infof("found PV Name: %q for VolumeID: %q from cache", pvName, volumeID)
pv, err := k8sCloudOperator.pvLister.Get(pvName)
if err != nil {
log.Errorf("failed to retrieve PV with volume ID: %q from API server. err: %v", volumeID, err)
return nil, err
allPVs, err := k8sCloudOperator.k8sClient.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
if err != nil {
log.Errorf("failed to retrieve all PVs from API server")
return nil, err
}
for _, pv := range allPVs.Items {
// Verify if it is vsphere block driver and volumehandle matches the
// volume ID.
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csitypes.Name && pv.Spec.CSI.VolumeHandle == volumeID {
log.Debugf("Found PV: %+v referring to volume ID: %s", pv, volumeID)
return &pv, nil
}
log.Debugf("Found PV: %+v referring to volume ID: %s", pv, volumeID)
return pv, nil
}
return nil, logger.LogNewErrorf(log, "failed to find PV referring to volume ID: %s", volumeID)
}
Expand Down

0 comments on commit e9f8a2f

Please sign in to comment.