Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle vc reboot session invalid for listview #3064

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions pkg/common/cns-lib/volume/listview.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ListViewImpl struct {
shouldStopListening bool
// this mutex is used while logging out expired VC session and creating a new one
mu sync.RWMutex
// isReady defines the ready state of the listview + property collector mechanism
isReady bool
}

// TaskDetails is used to hold state for a task
Expand Down Expand Up @@ -86,7 +88,6 @@ func NewListViewImpl(ctx context.Context, virtualCenter *cnsvsphere.VirtualCente

func (l *ListViewImpl) createListView(ctx context.Context, tasks []types.ManagedObjectReference) error {
log := logger.GetLogger(ctx)
var err error
if err := l.virtualCenter.Connect(ctx); err != nil {
return logger.LogNewErrorf(log, "failed to create a ListView. error: %+v", err)
} else {
Expand All @@ -105,20 +106,14 @@ func (l *ListViewImpl) createListView(ctx context.Context, tasks []types.Managed
}

// ResetVirtualCenter updates the VC object reference.
// It also triggers a restart of listview object and connection to VC.
// This is required as VC.Connect() can return true as the VC object points to latest config
// but adding a task to a listview object created with an older VC object will error out
// use case: ReloadConfiguration
func (l *ListViewImpl) ResetVirtualCenter(ctx context.Context, virtualCenter *cnsvsphere.VirtualCenter) {
log := logger.GetLogger(ctx)
log.Info("attempting to acquire lock before updating vc object")
l.mu.Lock()
defer l.mu.Unlock()
log.Info("updating VirtualCenter object reference in ListView")
log.Info("acquired lock before updating vc object")
l.virtualCenter = virtualCenter
log.Info("cancelling ongoing listView context to trigger restart with new credentials")
if l.waitForUpdatesCancelFunc != nil {
l.waitForUpdatesCancelFunc()
}
log.Info("updated VirtualCenter object reference in ListView")
}

Expand All @@ -137,8 +132,8 @@ func getListViewWaitFilter(listView *view.ListView) *property.WaitFilter {
}

func (l *ListViewImpl) isSessionValid(ctx context.Context) bool {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.RLock()
defer l.mu.RUnlock()
log := logger.GetLogger(ctx)
if l.virtualCenter.Client == nil || l.virtualCenter.Client.Client == nil {
return false
Expand All @@ -157,20 +152,26 @@ func (l *ListViewImpl) isSessionValid(ctx context.Context) bool {
return false
}

// IsListViewReady wraps a read lock to access the state of isReady
func (l *ListViewImpl) IsListViewReady() bool {
l.mu.RLock()
defer l.mu.RUnlock()
return l.isReady
}

// AddTask adds task to listView and the internal map
func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjectReference, ch chan TaskResult) error {
log := logger.GetLogger(ctx)
log.Infof("AddTask called for %+v", taskMoRef)

if !l.IsListViewReady() {
return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef)
}

if !l.isSessionValid(ctx) {
log.Infof("current session is not valid")
if err := l.virtualCenter.Connect(ctx); err != nil {
return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err)
}
log.Info("cancelling ongoing listView context to re-create listview object")
if l.waitForUpdatesCancelFunc != nil {
l.waitForUpdatesCancelFunc()
}
l.SetListViewNotReady(ctx)
return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef)
}

l.taskMap.Upsert(taskMoRef, TaskDetails{
Expand All @@ -184,6 +185,7 @@ func (l *ListViewImpl) AddTask(ctx context.Context, taskMoRef types.ManagedObjec
response, err := l.listView.Add(l.ctx, []types.ManagedObjectReference{taskMoRef})
if err != nil {
l.taskMap.Delete(taskMoRef)
l.SetListViewNotReady(ctx)
return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err)
}
if len(response) > 0 {
Expand Down Expand Up @@ -216,23 +218,21 @@ func (l *ListViewImpl) RemoveTask(ctx context.Context, taskMoRef types.ManagedOb
log.Infof("op timeout. context deadline exceeded. using listview context without a timeout")
ctx = l.ctx
}
if l.listView == nil {
return logger.LogNewErrorf(log, "failed to remove task from listView: listView not initialized")

if !l.IsListViewReady() {
return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef)
}

if !l.isSessionValid(ctx) {
log.Infof("current session is not valid")
if err := l.virtualCenter.Connect(ctx); err != nil {
return fmt.Errorf("%w. task: %v, err: %v", ErrListViewTaskAddition, taskMoRef, err)
}
log.Info("cancelling ongoing listView context to re-create listview object")
if l.waitForUpdatesCancelFunc != nil {
l.waitForUpdatesCancelFunc()
}
l.SetListViewNotReady(ctx)
return fmt.Errorf("%w. task: %v, err: listview not ready", ErrListViewTaskAddition, taskMoRef)
}

log.Infof("client is valid. trying to remove task from listview object")
_, err := l.listView.Remove(l.ctx, []types.ManagedObjectReference{taskMoRef})
if err != nil {
l.SetListViewNotReady(ctx)
return logger.LogNewErrorf(log, "failed to remove task %v from ListView. error: %+v", taskMoRef, err)
}
log.Infof("task %+v removed from listView", taskMoRef)
Expand All @@ -241,6 +241,28 @@ func (l *ListViewImpl) RemoveTask(ctx context.Context, taskMoRef types.ManagedOb
return nil
}

func (l *ListViewImpl) SetListViewNotReady(ctx context.Context) {
log := logger.GetLogger(ctx)
log.Infof("waiting for lock before setting listview to not ready")
l.mu.Lock()
adikul30 marked this conversation as resolved.
Show resolved Hide resolved
defer l.mu.Unlock()
log.Infof("acquired lock before setting listview to not ready")
l.isReady = false
if l.waitForUpdatesCancelFunc != nil {
l.waitForUpdatesCancelFunc()
}
log.Info("cancelled context")
}

func (l *ListViewImpl) connect() error {
log := logger.GetLogger(l.ctx)
log.Infof("waiting for lock before calling connect")
l.mu.Lock()
defer l.mu.Unlock()
log.Infof("acquired lock before calling connect")
return l.virtualCenter.Connect(l.ctx)
}

// listenToTaskUpdates is a long-running goroutine
// that uses a property collector to listen for task updates
// CSI ops add CNS tasks to listview and wait for a response from CNS
Expand All @@ -255,19 +277,23 @@ func (l *ListViewImpl) listenToTaskUpdates() {
recreateView := false
for {
// calling Connect at the beginning to ensure the current session is neither nil nor NotAuthenticated
if err := l.virtualCenter.Connect(l.ctx); err != nil {
if err := l.connect(); err != nil {
log.Errorf("failed to connect to vCenter. err: %v", err)
time.Sleep(waitForUpdatesRetry)
continue
} else {
log.Infof("connection to vc successful")
}

log.Infof("attempting lock before re-creating listview")
l.mu.Lock()
log.Infof("acquired lock before re-creating listview")
if recreateView {
adikul30 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("re-creating the listView object")
err := l.createListView(l.ctx, nil)
if err != nil {
log.Errorf("failed to create a ListView. error: %+v", err)
l.mu.Unlock()
continue
}

Expand All @@ -277,7 +303,11 @@ func (l *ListViewImpl) listenToTaskUpdates() {
}

log.Info("Starting listening for task updates...")
log.Infof("waitForUpdatesContext %v", l.waitForUpdatesContext)
pc := property.DefaultCollector(l.virtualCenter.Client.Client)
l.isReady = true
log.Infof("listview ready state is %v", l.isReady)
l.mu.Unlock()
err := property.WaitForUpdatesEx(l.waitForUpdatesContext, pc, filter, func(updates []types.ObjectUpdate) bool {
log.Debugf("Got %d property collector update(s)", len(updates))
for _, update := range updates {
Expand All @@ -300,6 +330,13 @@ func (l *ListViewImpl) listenToTaskUpdates() {
l.virtualCenter.Config.Host)
recreateView = true
l.reportErrorOnAllPendingTasks(err)
log.Info("waiting for lock before setting listview ready state to false")
l.mu.Lock()
log.Info("acquired lock before setting listview ready state to false")
log.Infof("setting listview ready state to false. current ready state: %v", l.isReady)
l.isReady = false
log.Infof("listview ready state is %v", l.isReady)
l.mu.Unlock()
}
// use case: unit tests: this will help us stop listening
// and finish the unit test
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/cns-lib/volume/listview_if.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ type ListViewIf interface {
// MarkTaskForDeletion marks a given task MoRef for deletion by a cleanup goroutine
// use case: failure to remove task due to a vc issue
MarkTaskForDeletion(ctx context.Context, taskMoRef types.ManagedObjectReference) error
// IsListViewReady returns the status of the listview + property collector mechanism
IsListViewReady() bool
// SetListViewNotReady explicitly states the listview state as not ready
// use case: unit tests
SetListViewNotReady(ctx context.Context)
}
18 changes: 18 additions & 0 deletions pkg/common/cns-lib/volume/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ type Manager interface {
string, error)
// GetOperationStore returns the VolumeOperationRequest interface
GetOperationStore() cnsvolumeoperationrequest.VolumeOperationRequest
// IsListViewReady returns the status of the listview + property collector mechanism
IsListViewReady() bool
// SetListViewNotReady explicitly states the listview state as not ready
// use case: unit tests
SetListViewNotReady(ctx context.Context)
}

// CnsVolumeInfo hold information related to volume created by CNS.
Expand Down Expand Up @@ -3020,3 +3025,16 @@ func (m *defaultManager) getAggregatedSnapshotSize(ctx context.Context, volumeID
}
return aggregatedSnapshotCapacity, nil
}

func (m *defaultManager) IsListViewReady() bool {
if m.listViewIf == nil {
return false
}
return m.listViewIf.IsListViewReady()
}

func (m *defaultManager) SetListViewNotReady(ctx context.Context) {
if m.listViewIf != nil {
m.listViewIf.SetListViewNotReady(ctx)
}
}
8 changes: 5 additions & 3 deletions pkg/common/cns-lib/vsphere/virtualcenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,11 @@ func (vc *VirtualCenter) connect(ctx context.Context, requestNewSession bool) er

log.Infof("logging out current session and clearing idle sessions")

err = vc.Client.Logout(ctx)
if err != nil {
log.Errorf("failed to logout current session. still clearing idle sessions. err: %v", err)
if vc.Client != nil && vc.Client.Client != nil {
err = vc.Client.Logout(ctx)
if err != nil {
log.Errorf("failed to logout current session. still clearing idle sessions. err: %v", err)
}
}

// If session has expired, create a new instance.
Expand Down
33 changes: 22 additions & 11 deletions pkg/common/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"os"
"sync"
"testing"
"time"

cnssim "github.com/vmware/govmomi/cns/simulator"
"github.com/vmware/govmomi/cns/types"
"github.com/vmware/govmomi/simulator"
"k8s.io/apimachinery/pkg/util/wait"

cnsvolumes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume"
cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere"
Expand All @@ -29,8 +31,7 @@ var (
)

type commonUtilsTest struct {
config *cnsconfig.Config
vcenter *cnsvsphere.VirtualCenter
volumeManager cnsvolumes.Manager
}

// configFromSim starts a vcsim instance and returns config for use against the vcsim instance.
Expand Down Expand Up @@ -129,9 +130,23 @@ func getCommonUtilsTest(t *testing.T) *commonUtilsTest {
if err != nil {
t.Fatal(err)
}

volumeManager, err := cnsvolumes.GetManager(ctx, virtualCenter, nil, false, false, false, "")
if err != nil {
t.Fatalf("failed to create an instance of volume manager. err=%v", err)
}

// wait till property collector has been started
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, false,
func(ctx context.Context) (done bool, err error) {
return volumeManager.IsListViewReady(), nil
})
if err != nil {
t.Fatalf("listview not ready. err=%v", err)
}

commonUtilsTestInstance = &commonUtilsTest{
config: csiConfig,
vcenter: virtualCenter,
volumeManager: volumeManager,
}
})
return commonUtilsTestInstance
Expand All @@ -141,23 +156,19 @@ func TestQuerySnapshotsUtil(t *testing.T) {
// Create context
commonUtilsTestInstance := getCommonUtilsTest(t)

volumeManager, err := cnsvolumes.GetManager(ctx, commonUtilsTestInstance.vcenter, nil, false, false, false, "")
if err != nil {
t.Fatalf("failed to create an instance of volume manager. err=%v", err)
}

queryFilter := types.CnsSnapshotQueryFilter{
SnapshotQuerySpecs: nil,
Cursor: &types.CnsCursor{
Offset: 0,
Limit: 10,
},
}
queryResultEntries, _, err := QuerySnapshotsUtil(ctx, volumeManager, queryFilter, DefaultQuerySnapshotLimit)
queryResultEntries, _, err := QuerySnapshotsUtil(ctx, commonUtilsTestInstance.volumeManager, queryFilter,
DefaultQuerySnapshotLimit)
if err != nil {
t.Error(err)
}
//TODO: Create Snapshots using CreateSnapshot API.
// TODO: Create Snapshots using CreateSnapshot API.
t.Log("Snapshots: ")
for _, entry := range queryResultEntries {
t.Log(entry)
Expand Down
28 changes: 28 additions & 0 deletions pkg/csi/service/vanilla/controller_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/google/uuid"
cnstypes "github.com/vmware/govmomi/cns/types"
"github.com/vmware/govmomi/find"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/vmware/govmomi/simulator"
Expand Down Expand Up @@ -365,6 +367,16 @@ func getControllerTestWithTopology(t *testing.T) *controllerTestTopology {
if err != nil {
t.Fatalf("failed to create an instance of volume manager. err=%v", err)
}

// wait till property collector has been started
err = wait.PollUntilContextTimeout(ctxtopology, 1*time.Second, 10*time.Second, false,
func(ctx context.Context) (done bool, err error) {
return volumeManager.IsListViewReady(), nil
})
if err != nil {
t.Fatalf("listview not ready. err=%v", err)
}

// GetManager returns a singleton instance of VolumeManager. So, it could be pointing
// to old VC instance as part of previous unit test run from same folder.
// Call ResetManager to get new VolumeManager instance with current VC configuration.
Expand All @@ -373,6 +385,22 @@ func getControllerTestWithTopology(t *testing.T) *controllerTestTopology {
t.Fatalf("failed to reset volume manager with new vcenter. err=%v", err)
}

// as per current logic, new vc object will be saved but not immediately used
// only when we notice an issue adding tasks to listview, we will kill the context to property collector
// causing the listview to be re-created with the newer credentials
// this method is called here to explicitly re-create the listview since we changed the config above for
// topology
volumeManager.SetListViewNotReady(ctxtopology)

// wait again for the property collector to be re-created
err = wait.PollUntilContextTimeout(ctxtopology, 1*time.Second, 10*time.Second, false,
func(ctx context.Context) (done bool, err error) {
return volumeManager.IsListViewReady(), nil
})
if err != nil {
t.Fatalf("listview not ready. err=%v", err)
}

manager := &common.Manager{
VcenterConfig: vcenterconfig,
CnsConfig: config,
Expand Down