diff --git a/pkg/common/cns-lib/volume/listview.go b/pkg/common/cns-lib/volume/listview.go index 62574208c7..4a339d681b 100644 --- a/pkg/common/cns-lib/volume/listview.go +++ b/pkg/common/cns-lib/volume/listview.go @@ -49,6 +49,8 @@ type ListViewImpl struct { shouldStopListening bool // this mutex is used while logging out expired VC session and creating a new one mu sync.RWMutex + // isReady defines the ready state of the listview + property collector mechanism + isReady bool } // TaskDetails is used to hold state for a task @@ -86,7 +88,6 @@ func NewListViewImpl(ctx context.Context, virtualCenter *cnsvsphere.VirtualCente func (l *ListViewImpl) createListView(ctx context.Context, tasks []types.ManagedObjectReference) error { log := logger.GetLogger(ctx) - var err error if err := l.virtualCenter.Connect(ctx); err != nil { return logger.LogNewErrorf(log, "failed to create a ListView. error: %+v", err) } else { @@ -105,20 +106,14 @@ func (l *ListViewImpl) createListView(ctx context.Context, tasks []types.Managed } // ResetVirtualCenter updates the VC object reference. -// It also triggers a restart of listview object and connection to VC. -// This is required as VC.Connect() can return true as the VC object points to latest config -// but adding a task to a listview object created with an older VC object will error out // use case: ReloadConfiguration func (l *ListViewImpl) ResetVirtualCenter(ctx context.Context, virtualCenter *cnsvsphere.VirtualCenter) { log := logger.GetLogger(ctx) + log.Info("attempting to acquire lock before updating vc object") l.mu.Lock() defer l.mu.Unlock() - log.Info("updating VirtualCenter object reference in ListView") + log.Info("acquired lock before updating vc object") l.virtualCenter = virtualCenter - log.Info("cancelling ongoing listView context to trigger restart with new credentials") - if l.waitForUpdatesCancelFunc != nil { - l.waitForUpdatesCancelFunc() - } log.Info("updated VirtualCenter object reference in ListView") } @@ -137,8 +132,8 @@ func getListViewWaitFilter(listView *view.ListView) *property.WaitFilter { } func (l *ListViewImpl) isSessionValid(ctx context.Context) bool { - l.mu.Lock() - defer l.mu.Unlock() + l.mu.RLock() + defer l.mu.RUnlock() log := logger.GetLogger(ctx) if l.virtualCenter.Client == nil || l.virtualCenter.Client.Client == nil { return false @@ -157,20 +152,26 @@ func (l *ListViewImpl) isSessionValid(ctx context.Context) bool { return false } +// IsListViewReady wraps a read lock to access the state of isReady +func (l *ListViewImpl) IsListViewReady() bool { + l.mu.RLock() + defer l.mu.RUnlock() + return l.isReady +} + // AddTask adds task to listView and the internal map func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjectReference, ch chan TaskResult) error { log := logger.GetLogger(ctx) log.Infof("AddTask called for %+v", taskMoRef) + if !l.IsListViewReady() { + return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef) + } + if !l.isSessionValid(ctx) { log.Infof("current session is not valid") - if err := l.virtualCenter.Connect(ctx); err != nil { - return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err) - } - log.Info("cancelling ongoing listView context to re-create listview object") - if l.waitForUpdatesCancelFunc != nil { - l.waitForUpdatesCancelFunc() - } + l.SetListViewNotReady(ctx) + return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef) } l.taskMap.Upsert(taskMoRef, TaskDetails{ @@ -184,6 +185,7 @@ func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjec response, err := l.listView.Add(l.ctx, []types.ManagedObjectReference{taskMoRef}) if err != nil { l.taskMap.Delete(taskMoRef) + l.SetListViewNotReady(ctx) return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err) } if len(response) > 0 { @@ -216,23 +218,21 @@ func (l *ListViewImpl) RemoveTask(ctx context.Context, taskMoRef types.ManagedOb log.Infof("op timeout. context deadline exceeded. using listview context without a timeout") ctx = l.ctx } - if l.listView == nil { - return logger.LogNewErrorf(log, "failed to remove task from listView: listView not initialized") + + if !l.IsListViewReady() { + return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef) } if !l.isSessionValid(ctx) { log.Infof("current session is not valid") - if err := l.virtualCenter.Connect(ctx); err != nil { - return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err) - } - log.Info("cancelling ongoing listView context to re-create listview object") - if l.waitForUpdatesCancelFunc != nil { - l.waitForUpdatesCancelFunc() - } + l.SetListViewNotReady(ctx) + return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef) } + log.Infof("client is valid. trying to remove task from listview object") _, err := l.listView.Remove(l.ctx, []types.ManagedObjectReference{taskMoRef}) if err != nil { + l.SetListViewNotReady(ctx) return logger.LogNewErrorf(log, "failed to remove task %v from ListView. error: %+v", taskMoRef, err) } log.Infof("task %+v removed from listView", taskMoRef) @@ -241,6 +241,28 @@ func (l *ListViewImpl) RemoveTask(ctx context.Context, taskMoRef types.ManagedOb return nil } +func (l *ListViewImpl) SetListViewNotReady(ctx context.Context) { + log := logger.GetLogger(ctx) + log.Infof("waiting for lock before setting listview to not ready") + l.mu.Lock() + defer l.mu.Unlock() + log.Infof("acquired lock before setting listview to not ready") + l.isReady = false + if l.waitForUpdatesCancelFunc != nil { + l.waitForUpdatesCancelFunc() + } + log.Info("cancelled context") +} + +func (l *ListViewImpl) connect() error { + log := logger.GetLogger(l.ctx) + log.Infof("waiting for lock before calling connect") + l.mu.Lock() + defer l.mu.Unlock() + log.Infof("acquired lock before calling connect") + return l.virtualCenter.Connect(l.ctx) +} + // listenToTaskUpdates is a long-running goroutine // that uses a property collector to listen for task updates // CSI ops add CNS tasks to listview and wait for a response from CNS @@ -255,7 +277,7 @@ func (l *ListViewImpl) listenToTaskUpdates() { recreateView := false for { // calling Connect at the beginning to ensure the current session is neither nil nor NotAuthenticated - if err := l.virtualCenter.Connect(l.ctx); err != nil { + if err := l.connect(); err != nil { log.Errorf("failed to connect to vCenter. err: %v", err) time.Sleep(waitForUpdatesRetry) continue @@ -263,11 +285,15 @@ func (l *ListViewImpl) listenToTaskUpdates() { log.Infof("connection to vc successful") } + log.Infof("attempting lock before re-creating listview") + l.mu.Lock() + log.Infof("acquired lock before re-creating listview") if recreateView { log.Info("re-creating the listView object") err := l.createListView(l.ctx, nil) if err != nil { log.Errorf("failed to create a ListView. error: %+v", err) + l.mu.Unlock() continue } @@ -277,7 +303,11 @@ func (l *ListViewImpl) listenToTaskUpdates() { } log.Info("Starting listening for task updates...") + log.Infof("waitForUpdatesContext %v", l.waitForUpdatesContext) pc := property.DefaultCollector(l.virtualCenter.Client.Client) + l.isReady = true + log.Infof("listview ready state is %v", l.isReady) + l.mu.Unlock() err := property.WaitForUpdatesEx(l.waitForUpdatesContext, pc, filter, func(updates []types.ObjectUpdate) bool { log.Debugf("Got %d property collector update(s)", len(updates)) for _, update := range updates { @@ -300,6 +330,13 @@ func (l *ListViewImpl) listenToTaskUpdates() { l.virtualCenter.Config.Host) recreateView = true l.reportErrorOnAllPendingTasks(err) + log.Info("waiting for lock before setting listview ready state to false") + l.mu.Lock() + log.Info("acquired lock before setting listview ready state to false") + log.Infof("setting listview ready state to false. current ready state: %v", l.isReady) + l.isReady = false + log.Infof("listview ready state is %v", l.isReady) + l.mu.Unlock() } // use case: unit tests: this will help us stop listening // and finish the unit test diff --git a/pkg/common/cns-lib/volume/listview_if.go b/pkg/common/cns-lib/volume/listview_if.go index 124417a8ee..328cfc93af 100644 --- a/pkg/common/cns-lib/volume/listview_if.go +++ b/pkg/common/cns-lib/volume/listview_if.go @@ -24,4 +24,9 @@ type ListViewIf interface { // MarkTaskForDeletion marks a given task MoRef for deletion by a cleanup goroutine // use case: failure to remove task due to a vc issue MarkTaskForDeletion(ctx context.Context, taskMoRef types.ManagedObjectReference) error + // IsListViewReady returns the status of the listview + property collector mechanism + IsListViewReady() bool + // SetListViewNotReady explicitly states the listview state as not ready + // use case: unit tests + SetListViewNotReady(ctx context.Context) } diff --git a/pkg/common/cns-lib/volume/manager.go b/pkg/common/cns-lib/volume/manager.go index 888c5e3781..7e9d4eb642 100644 --- a/pkg/common/cns-lib/volume/manager.go +++ b/pkg/common/cns-lib/volume/manager.go @@ -145,6 +145,11 @@ type Manager interface { string, error) // GetOperationStore returns the VolumeOperationRequest interface GetOperationStore() cnsvolumeoperationrequest.VolumeOperationRequest + // IsListViewReady returns the status of the listview + property collector mechanism + IsListViewReady() bool + // SetListViewNotReady explicitly states the listview state as not ready + // use case: unit tests + SetListViewNotReady(ctx context.Context) } // CnsVolumeInfo hold information related to volume created by CNS. @@ -3020,3 +3025,16 @@ func (m *defaultManager) getAggregatedSnapshotSize(ctx context.Context, volumeID } return aggregatedSnapshotCapacity, nil } + +func (m *defaultManager) IsListViewReady() bool { + if m.listViewIf == nil { + return false + } + return m.listViewIf.IsListViewReady() +} + +func (m *defaultManager) SetListViewNotReady(ctx context.Context) { + if m.listViewIf != nil { + m.listViewIf.SetListViewNotReady(ctx) + } +} diff --git a/pkg/common/cns-lib/vsphere/virtualcenter.go b/pkg/common/cns-lib/vsphere/virtualcenter.go index fd914a6c48..580580acdf 100644 --- a/pkg/common/cns-lib/vsphere/virtualcenter.go +++ b/pkg/common/cns-lib/vsphere/virtualcenter.go @@ -341,9 +341,11 @@ func (vc *VirtualCenter) connect(ctx context.Context, requestNewSession bool) er log.Infof("logging out current session and clearing idle sessions") - err = vc.Client.Logout(ctx) - if err != nil { - log.Errorf("failed to logout current session. still clearing idle sessions. err: %v", err) + if vc.Client != nil && vc.Client.Client != nil { + err = vc.Client.Logout(ctx) + if err != nil { + log.Errorf("failed to logout current session. still clearing idle sessions. err: %v", err) + } } // If session has expired, create a new instance. diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go index a5a05df15d..74e3f45861 100644 --- a/pkg/common/utils/utils_test.go +++ b/pkg/common/utils/utils_test.go @@ -8,10 +8,12 @@ import ( "os" "sync" "testing" + "time" cnssim "github.com/vmware/govmomi/cns/simulator" "github.com/vmware/govmomi/cns/types" "github.com/vmware/govmomi/simulator" + "k8s.io/apimachinery/pkg/util/wait" cnsvolumes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume" cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere" @@ -29,8 +31,7 @@ var ( ) type commonUtilsTest struct { - config *cnsconfig.Config - vcenter *cnsvsphere.VirtualCenter + volumeManager cnsvolumes.Manager } // configFromSim starts a vcsim instance and returns config for use against the vcsim instance. @@ -129,9 +130,23 @@ func getCommonUtilsTest(t *testing.T) *commonUtilsTest { if err != nil { t.Fatal(err) } + + volumeManager, err := cnsvolumes.GetManager(ctx, virtualCenter, nil, false, false, false, "") + if err != nil { + t.Fatalf("failed to create an instance of volume manager. err=%v", err) + } + + // wait till property collector has been started + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, false, + func(ctx context.Context) (done bool, err error) { + return volumeManager.IsListViewReady(), nil + }) + if err != nil { + t.Fatalf("listview not ready. err=%v", err) + } + commonUtilsTestInstance = &commonUtilsTest{ - config: csiConfig, - vcenter: virtualCenter, + volumeManager: volumeManager, } }) return commonUtilsTestInstance @@ -141,11 +156,6 @@ func TestQuerySnapshotsUtil(t *testing.T) { // Create context commonUtilsTestInstance := getCommonUtilsTest(t) - volumeManager, err := cnsvolumes.GetManager(ctx, commonUtilsTestInstance.vcenter, nil, false, false, false, "") - if err != nil { - t.Fatalf("failed to create an instance of volume manager. err=%v", err) - } - queryFilter := types.CnsSnapshotQueryFilter{ SnapshotQuerySpecs: nil, Cursor: &types.CnsCursor{ @@ -153,11 +163,12 @@ func TestQuerySnapshotsUtil(t *testing.T) { Limit: 10, }, } - queryResultEntries, _, err := QuerySnapshotsUtil(ctx, volumeManager, queryFilter, DefaultQuerySnapshotLimit) + queryResultEntries, _, err := QuerySnapshotsUtil(ctx, commonUtilsTestInstance.volumeManager, queryFilter, + DefaultQuerySnapshotLimit) if err != nil { t.Error(err) } - //TODO: Create Snapshots using CreateSnapshot API. + // TODO: Create Snapshots using CreateSnapshot API. t.Log("Snapshots: ") for _, entry := range queryResultEntries { t.Log(entry) diff --git a/pkg/csi/service/vanilla/controller_topology_test.go b/pkg/csi/service/vanilla/controller_topology_test.go index 229880e9af..92de365360 100644 --- a/pkg/csi/service/vanilla/controller_topology_test.go +++ b/pkg/csi/service/vanilla/controller_topology_test.go @@ -23,10 +23,12 @@ import ( "strconv" "sync" "testing" + "time" "github.com/google/uuid" cnstypes "github.com/vmware/govmomi/cns/types" "github.com/vmware/govmomi/find" + "k8s.io/apimachinery/pkg/util/wait" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/vmware/govmomi/simulator" @@ -365,6 +367,16 @@ func getControllerTestWithTopology(t *testing.T) *controllerTestTopology { if err != nil { t.Fatalf("failed to create an instance of volume manager. err=%v", err) } + + // wait till property collector has been started + err = wait.PollUntilContextTimeout(ctxtopology, 1*time.Second, 10*time.Second, false, + func(ctx context.Context) (done bool, err error) { + return volumeManager.IsListViewReady(), nil + }) + if err != nil { + t.Fatalf("listview not ready. err=%v", err) + } + // GetManager returns a singleton instance of VolumeManager. So, it could be pointing // to old VC instance as part of previous unit test run from same folder. // Call ResetManager to get new VolumeManager instance with current VC configuration. @@ -373,6 +385,22 @@ func getControllerTestWithTopology(t *testing.T) *controllerTestTopology { t.Fatalf("failed to reset volume manager with new vcenter. err=%v", err) } + // as per current logic, new vc object will be saved but not immediately used + // only when we notice an issue adding tasks to listview, we will kill the context to property collector + // causing the listview to be re-created with the newer credentials + // this method is called here to explicitly re-create the listview since we changed the config above for + // topology + volumeManager.SetListViewNotReady(ctxtopology) + + // wait again for the property collector to be re-created + err = wait.PollUntilContextTimeout(ctxtopology, 1*time.Second, 10*time.Second, false, + func(ctx context.Context) (done bool, err error) { + return volumeManager.IsListViewReady(), nil + }) + if err != nil { + t.Fatalf("listview not ready. err=%v", err) + } + manager := &common.Manager{ VcenterConfig: vcenterconfig, CnsConfig: config,