Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP:Add informers for volume snapshot #3045

Open
wants to merge 1 commit into
base: storage-quota-m2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ func (im *InformerManager) AddPVCListener(ctx context.Context, add func(obj inte
return nil
}

// AddSnapshotListener hooks up add, update, delete callbacks.
func (im *InformerManager) AddSnapshotListener(ctx context.Context, add func(obj interface{}),
update func(oldObj, newObj interface{}), remove func(obj interface{})) error {
log := logger.GetLogger(ctx)
if im.snapshotInformer == nil {
im.snapshotInformer = im.snapInformerFactory.VolumeSnapshots().Informer()
}
im.snapshotSynced = im.snapshotInformer.HasSynced
_, err := im.snapshotInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: add,
UpdateFunc: update,
DeleteFunc: remove,
})
if err != nil {
return logger.LogNewErrorf(log, "failed to add event handler on Snapshot listener. Error: %v", err)
}
return nil
}

// AddPVListener hooks up add, update, delete callbacks.
func (im *InformerManager) AddPVListener(ctx context.Context, add func(obj interface{}),
update func(oldObj, newObj interface{}), remove func(obj interface{})) error {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubernetes

import (
volumesnapshotinformers "github.com/kubernetes-csi/external-snapshotter/client/v6/informers/externalversions/volumesnapshot/v1"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -50,6 +51,8 @@ type InformerManager struct {
client clientset.Interface
// main shared informer factory
informerFactory informers.SharedInformerFactory

snapInformerFactory volumesnapshotinformers.Interface
// main signal
stopCh (<-chan struct{})

Expand All @@ -71,6 +74,11 @@ type InformerManager struct {
// Function to determine if pvcInformer has been synced
pvcSynced cache.InformerSynced

// VolumeSnapshot informer
snapshotInformer cache.SharedInformer
// Function to determine if snapshotInformer has been synced
snapshotSynced cache.InformerSynced

// namespaceInformer informer
namespaceInformer cache.SharedInformer
// Function to determine if namespaceInformer has been synced
Expand Down
65 changes: 64 additions & 1 deletion pkg/syncer/metadatasyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"

snap "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
cnsoperatorv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator"
storagepolicyv1alpha2 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/storagepolicy/v1alpha2"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/migration"
Expand Down Expand Up @@ -573,7 +574,22 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl
if err != nil {
return logger.LogNewErrorf(log, "failed to listen on pods. Error: %v", err)
}

// Set up kubernetes resource listeners for metadata syncer.
metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sClient, true)
err = metadataSyncer.k8sInformerManager.AddSnapshotListener(
ctx,
func(obj interface{}) { // Add.
snapshotAdded(obj, metadataSyncer)
},
func(oldObj interface{}, newObj interface{}) { // Update.
snapshotUpdated(oldObj, newObj, metadataSyncer)
},
func(obj interface{}) { // Delete.
snapshotDeleted(obj, metadataSyncer)
})
if err != nil {
return logger.LogNewErrorf(log, "failed to listen on VolumeSnapshots. Error: %v", err)
}
metadataSyncer.pvLister = metadataSyncer.k8sInformerManager.GetPVLister()
metadataSyncer.pvcLister = metadataSyncer.k8sInformerManager.GetPVCLister()
metadataSyncer.podLister = metadataSyncer.k8sInformerManager.GetPodLister()
Expand Down Expand Up @@ -1802,6 +1818,53 @@ func pvcDeleted(obj interface{}, metadataSyncer *metadataSyncInformer) {
}
}

// snapshotAdded
func snapshotAdded(obj interface{}, metadataSyncer *metadataSyncInformer) {
_, log := logger.GetNewContextWithLogger()
log.Info("VolumeSnapshotAdded: Volume Snapshot Updated")
volSnap, ok := obj.(*snap.VolumeSnapshot)
if volSnap == nil || !ok {
return
}
log.Infof("Volume Sansphot Added: %+v", volSnap)
}

// snapshotUpdated
func snapshotUpdated(oldObj, newObj interface{}, metadataSyncer *metadataSyncInformer) {
_, log := logger.GetNewContextWithLogger()
log.Info("VolumeSnapshotUpdated: Volume Snapshot Updated")
// Get old and new pvc objects.
oldSnap, ok := oldObj.(*snap.VolumeSnapshot)
if oldSnap == nil || !ok {
return
}
newSnap, ok := newObj.(*snap.VolumeSnapshot)
if newSnap == nil || !ok {
return
}
log.Infof("VolumeSnapshotUpdated: Volume Snapshot Updated from %+v to %+v", oldSnap, newSnap)
if *newSnap.Status.ReadyToUse {
log.Infof("PVCUpdated: New PVC not in Bound phase")
return
}
log.Infof("Sansphot Updated: %+v", newSnap)
}

// snapshotDeleted
func snapshotDeleted(obj interface{}, metadataSyncer *metadataSyncInformer) {
_, log := logger.GetNewContextWithLogger()
log.Info("VolumeSnapshotDeleted: Volume Snapshot Deleted")
volSnap, ok := obj.(*snap.VolumeSnapshot)
if volSnap == nil || !ok {
log.Warnf("VolumeSnapshotDeleted: unrecognized object %+v", obj)
return
}
log.Debugf("VolumeSnapshotDeleted: %+v", volSnap)
if *volSnap.Status.ReadyToUse {
return
}
}

// pvUpdated updates volume metadata on VC when volume labels on K8S cluster
// have been updated.
func pvUpdated(oldObj, newObj interface{}, metadataSyncer *metadataSyncInformer) {
Expand Down