diff --git a/pkg/common/cns-lib/volume/listview.go b/pkg/common/cns-lib/volume/listview.go index 62574208c7..edc4f5938d 100644 --- a/pkg/common/cns-lib/volume/listview.go +++ b/pkg/common/cns-lib/volume/listview.go @@ -49,6 +49,8 @@ type ListViewImpl struct { shouldStopListening bool // this mutex is used while logging out expired VC session and creating a new one mu sync.RWMutex + + isReady bool } // TaskDetails is used to hold state for a task @@ -86,7 +88,8 @@ func NewListViewImpl(ctx context.Context, virtualCenter *cnsvsphere.VirtualCente func (l *ListViewImpl) createListView(ctx context.Context, tasks []types.ManagedObjectReference) error { log := logger.GetLogger(ctx) - var err error + l.mu.Lock() + defer l.mu.Unlock() if err := l.virtualCenter.Connect(ctx); err != nil { return logger.LogNewErrorf(log, "failed to create a ListView. error: %+v", err) } else { @@ -116,8 +119,11 @@ func (l *ListViewImpl) ResetVirtualCenter(ctx context.Context, virtualCenter *cn log.Info("updating VirtualCenter object reference in ListView") l.virtualCenter = virtualCenter log.Info("cancelling ongoing listView context to trigger restart with new credentials") + l.isReady = false if l.waitForUpdatesCancelFunc != nil { l.waitForUpdatesCancelFunc() + log.Info("called waitForUpdatesCancelFunc") + log.Infof("waitForUpdatesContext %+v", l.waitForUpdatesContext) } log.Info("updated VirtualCenter object reference in ListView") } @@ -157,20 +163,34 @@ func (l *ListViewImpl) isSessionValid(ctx context.Context) bool { return false } +// isListViewReady wraps a read lock to access the state of isReady +func (l *ListViewImpl) isListViewReady() bool { + l.mu.RLock() + defer l.mu.RUnlock() + return l.isReady +} + // AddTask adds task to listView and the internal map func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjectReference, ch chan TaskResult) error { log := logger.GetLogger(ctx) log.Infof("AddTask called for %+v", taskMoRef) + if !l.isListViewReady() { + return fmt.Errorf("%w. task: %v, err: listview not initialized", ErrListViewTaskAddition, taskMoRef) + } + if !l.isSessionValid(ctx) { log.Infof("current session is not valid") - if err := l.virtualCenter.Connect(ctx); err != nil { - return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err) - } - log.Info("cancelling ongoing listView context to re-create listview object") - if l.waitForUpdatesCancelFunc != nil { - l.waitForUpdatesCancelFunc() + l.mu.Lock() + defer l.mu.Unlock() + if l.isReady { + l.isReady = false + if l.waitForUpdatesCancelFunc != nil { + l.waitForUpdatesCancelFunc() + } + log.Infof("cancelled context and set listview state to %v", l.isReady) } + return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef) } l.taskMap.Upsert(taskMoRef, TaskDetails{ @@ -216,20 +236,25 @@ func (l *ListViewImpl) RemoveTask(ctx context.Context, taskMoRef types.ManagedOb log.Infof("op timeout. context deadline exceeded. using listview context without a timeout") ctx = l.ctx } - if l.listView == nil { - return logger.LogNewErrorf(log, "failed to remove task from listView: listView not initialized") + + if !l.isListViewReady() { + return fmt.Errorf("%w. task: %v, err: listview not initialized", ErrListViewTaskAddition, taskMoRef) } if !l.isSessionValid(ctx) { log.Infof("current session is not valid") - if err := l.virtualCenter.Connect(ctx); err != nil { - return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err) - } - log.Info("cancelling ongoing listView context to re-create listview object") - if l.waitForUpdatesCancelFunc != nil { - l.waitForUpdatesCancelFunc() + l.mu.Lock() + defer l.mu.Unlock() + if l.isReady { + l.isReady = false + if l.waitForUpdatesCancelFunc != nil { + l.waitForUpdatesCancelFunc() + } + log.Infof("cancelled context and set listview state to %v", l.isReady) } + return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef) } + log.Infof("client is valid. trying to remove task from listview object") _, err := l.listView.Remove(l.ctx, []types.ManagedObjectReference{taskMoRef}) if err != nil { @@ -277,7 +302,11 @@ func (l *ListViewImpl) listenToTaskUpdates() { } log.Info("Starting listening for task updates...") + log.Infof("waitForUpdatesContext %v", l.waitForUpdatesContext) pc := property.DefaultCollector(l.virtualCenter.Client.Client) + l.mu.Lock() + l.isReady = true + l.mu.Unlock() err := property.WaitForUpdatesEx(l.waitForUpdatesContext, pc, filter, func(updates []types.ObjectUpdate) bool { log.Debugf("Got %d property collector update(s)", len(updates)) for _, update := range updates { @@ -300,6 +329,9 @@ func (l *ListViewImpl) listenToTaskUpdates() { l.virtualCenter.Config.Host) recreateView = true l.reportErrorOnAllPendingTasks(err) + l.mu.Lock() + l.isReady = false + l.mu.Unlock() } // use case: unit tests: this will help us stop listening // and finish the unit test diff --git a/pkg/common/cns-lib/vsphere/virtualcenter.go b/pkg/common/cns-lib/vsphere/virtualcenter.go index fce16208df..b5db71b9e6 100644 --- a/pkg/common/cns-lib/vsphere/virtualcenter.go +++ b/pkg/common/cns-lib/vsphere/virtualcenter.go @@ -329,9 +329,11 @@ func (vc *VirtualCenter) connect(ctx context.Context, requestNewSession bool) er log.Infof("logging out current session and clearing idle sessions") - err = vc.Client.Logout(ctx) - if err != nil { - log.Errorf("failed to logout current session. still clearing idle sessions. err: %v", err) + if vc.Client != nil && vc.Client.Client != nil { + err = vc.Client.Logout(ctx) + if err != nil { + log.Errorf("failed to logout current session. still clearing idle sessions. err: %v", err) + } } // If session has expired, create a new instance.