Skip to content

Commit

Permalink
WorkloadDomainIsolation: Static volume support on supervisor (#3017)
Browse files Browse the repository at this point in the history
  • Loading branch information
shalini-b authored Sep 4, 2024
1 parent e9f8a2f commit e03934b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 55 deletions.
6 changes: 5 additions & 1 deletion pkg/csi/service/common/commonco/k8sorchestrator/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,8 +1690,12 @@ func (volTopology *wcpControllerVolumeTopology) GetTopologyInfoFromNodes(ctx con
// In VC 9.0, if StorageTopologyType is not set, all the zones the selected datastore
// is accessible from will be added as node affinity terms on the PV even if the zones
// are not associated with the namespace of the PVC.
// This code block runs for static as well as dynamic volume provisioning case.
case "":
// This code block runs for static as well as dynamic volume provisioning case.
// TopoSegToDatastoresMap will be nil in case of static volume provisioning.
if params.TopoSegToDatastoresMap == nil {
params.TopoSegToDatastoresMap = make(map[string][]*cnsvsphere.DatastoreInfo)
}
var selectedSegments []map[string]string
for zone, clusters := range azClustersMap {
if _, exists := params.TopoSegToDatastoresMap[zone]; !exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ var (

topologyMgr commoncotypes.ControllerTopologyService
clusterComputeResourceMoIds []string
// workloadDomainIsolationEnabled determines if the workload domain
// isolation feature is available on a supervisor cluster.
workloadDomainIsolationEnabled bool
)

// Add creates a new CnsRegisterVolume Controller and adds it to the Manager,
Expand All @@ -86,6 +89,9 @@ func Add(mgr manager.Manager, clusterFlavor cnstypes.CnsClusterFlavor,
log.Debug("Not initializing the CnsRegisterVolume Controller as its a non-WCP CSI deployment")
return nil
}
workloadDomainIsolationEnabled = commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx,
common.WorkloadDomainIsolation)

var volumeInfoService cnsvolumeinfo.VolumeInfoService
if clusterFlavor == cnstypes.CnsClusterFlavorWorkload {
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.TKGsHA) {
Expand Down Expand Up @@ -216,7 +222,7 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
timeout = backOffDuration[instance.Name]
backOffDurationMapMutex.Unlock()

// If the CnsRegistereVolume instance is already registered, remove the
// If the CnsRegisterVolume instance is already registered, remove the
// instance from the queue.
if instance.Status.Registered {
backOffDurationMapMutex.Lock()
Expand Down Expand Up @@ -298,18 +304,20 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
return reconcile.Result{RequeueAfter: timeout}, nil
}

if syncer.IsPodVMOnStretchSupervisorFSSEnabled && len(clusterComputeResourceMoIds) > 1 {
azClustersMap := topologyMgr.GetAZClustersMap(ctx)
isAccessible := isDatastoreAccessibleToAZClusters(ctx, vc, azClustersMap, volume.DatastoreUrl)
if !isAccessible {
log.Errorf("Volume: %s present on datastore: %s is not accessible to any of the AZ clusters: %v",
volumeID, volume.DatastoreUrl, azClustersMap)
setInstanceError(ctx, r, instance, "Volume in the spec is not accessible to any of the AZ clusters")
_, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false)
if err != nil {
log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err)
if syncer.IsPodVMOnStretchSupervisorFSSEnabled {
if workloadDomainIsolationEnabled || len(clusterComputeResourceMoIds) > 1 {
azClustersMap := topologyMgr.GetAZClustersMap(ctx)
isAccessible := isDatastoreAccessibleToAZClusters(ctx, vc, azClustersMap, volume.DatastoreUrl)
if !isAccessible {
log.Errorf("Volume: %s present on datastore: %s is not accessible to any of the AZ clusters: %v",
volumeID, volume.DatastoreUrl, azClustersMap)
setInstanceError(ctx, r, instance, "Volume in the spec is not accessible to any of the AZ clusters")
_, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false)
if err != nil {
log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err)
}
return reconcile.Result{RequeueAfter: timeout}, nil
}
return reconcile.Result{RequeueAfter: timeout}, nil
}
} else {
// Verify if the volume is accessible to Supervisor cluster.
Expand Down Expand Up @@ -338,39 +346,6 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
return reconcile.Result{RequeueAfter: timeout}, nil
}

if syncer.IsPodVMOnStretchSupervisorFSSEnabled && len(clusterComputeResourceMoIds) > 1 {
// Calculate accessible topology for the provisioned volume.
datastoreAccessibleTopology, err := topologyMgr.GetTopologyInfoFromNodes(ctx,
commoncotypes.WCPRetrieveTopologyInfoParams{
DatastoreURL: volume.DatastoreUrl,
StorageTopologyType: "zonal",
TopologyRequirement: nil,
Vc: vc})
if err != nil {
msg := fmt.Sprintf("failed to find volume topology. Error: %v", err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
matchExpressions := make([]v1.NodeSelectorRequirement, 0)
for key, value := range datastoreAccessibleTopology[0] {
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
})
}
pvNodeAffinity = &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
}
}

k8sclient, err := k8s.NewClient(ctx)
if err != nil {
log.Errorf("Failed to initialize K8S client when registering the CnsRegisterVolume "+
Expand All @@ -392,6 +367,77 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
log.Infof("Volume with storagepolicyId: %s is mapping to K8S storage class: %s and assigned to namespace: %s",
volume.StoragePolicyId, storageClassName, request.Namespace)

sc, err := k8sclient.StorageV1().StorageClasses().Get(ctx, storageClassName, metav1.GetOptions{})
if err != nil {
msg := fmt.Sprintf("Failed to fetch StorageClass: %q with error: %+v", storageClassName, err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}

// Calculate accessible topology for the provisioned volume.
var datastoreAccessibleTopology []map[string]string
if syncer.IsPodVMOnStretchSupervisorFSSEnabled {
if workloadDomainIsolationEnabled {
datastoreAccessibleTopology, err = topologyMgr.GetTopologyInfoFromNodes(ctx,
commoncotypes.WCPRetrieveTopologyInfoParams{
DatastoreURL: volume.DatastoreUrl,
StorageTopologyType: sc.Parameters["StorageTopologyType"],
TopologyRequirement: nil,
Vc: vc})
} else if len(clusterComputeResourceMoIds) > 1 {
datastoreAccessibleTopology, err = topologyMgr.GetTopologyInfoFromNodes(ctx,
commoncotypes.WCPRetrieveTopologyInfoParams{
DatastoreURL: volume.DatastoreUrl,
StorageTopologyType: "zonal",
TopologyRequirement: nil,
Vc: vc})
}
if err != nil {
msg := fmt.Sprintf("failed to find volume topology. Error: %v", err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}

// Create node affinity terms from datastoreAccessibleTopology.
var terms []v1.NodeSelectorTerm
if workloadDomainIsolationEnabled {
for _, topologyTerms := range datastoreAccessibleTopology {

var expressions []v1.NodeSelectorRequirement
for key, value := range topologyTerms {
expressions = append(expressions, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
})
}
terms = append(terms, v1.NodeSelectorTerm{
MatchExpressions: expressions,
})
}
} else {
matchExpressions := make([]v1.NodeSelectorRequirement, 0)
for key, value := range datastoreAccessibleTopology[0] {
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
})
}
terms = append(terms, v1.NodeSelectorTerm{
MatchExpressions: matchExpressions,
})
}

pvNodeAffinity = &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: terms,
},
}
}

capacityInMb := volume.BackingObjectDetails.GetCnsBackingObjectDetails().CapacityInMb
accessMode := instance.Spec.AccessMode
// Set accessMode to ReadWriteOnce if DiskURLPath is used for import.
Expand Down Expand Up @@ -436,9 +482,15 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
return reconcile.Result{RequeueAfter: timeout}, nil
}
// Create PVC mapping to above created PV.
log.Infof("Now creating pvc: %s", instance.Spec.PvcName)
pvcSpec := getPersistentVolumeClaimSpec(instance.Spec.PvcName, instance.Namespace, capacityInMb,
storageClassName, accessMode, pvName)
log.Infof("Creating PVC: %s", instance.Spec.PvcName)
pvcSpec, err := getPersistentVolumeClaimSpec(ctx, instance.Spec.PvcName, instance.Namespace, capacityInMb,
storageClassName, accessMode, pvName, datastoreAccessibleTopology)
if err != nil {
msg := fmt.Sprintf("Failed to create spec for PVC: %q. Error: %v", instance.Spec.PvcName, err)
log.Errorf(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
log.Debugf("PVC spec is: %+v", pvcSpec)
pvc, err := k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Create(ctx,
pvcSpec, metav1.CreateOptions{})
Expand Down
45 changes: 39 additions & 6 deletions pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cnsregistervolume

import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -72,18 +73,30 @@ func isDatastoreAccessibleToAZClusters(ctx context.Context, vc *vsphere.VirtualC
azClustersMap map[string][]string, datastoreURL string) bool {
log := logger.GetLogger(ctx)
for _, clusterIDs := range azClustersMap {
var found bool
for _, clusterID := range clusterIDs {
sharedDatastores, _, err := vsphere.GetCandidateDatastoresInCluster(ctx, vc, clusterID, false)
if err != nil {
log.Warnf("Failed to get candidate datastores for cluster: %s with err: %+v", clusterID, err)
continue
}
found = false
for _, ds := range sharedDatastores {
if ds.Info.Url == datastoreURL {
log.Infof("Found datastoreUrl: %s is accessible to cluster: %s", datastoreURL, clusterID)
return true
found = true
}
}
// If datastoreURL was found in the list of datastores accessible to the
// cluster with clusterID, continue checking for the rest of the clusters
// in AZ. Otherwise, break and check the next AZ in azClustersMap.
if !found {
break
}
}
// datastoreURL was found in all the clusters with clusterIDs.
if found {
return true
}
}
return false
Expand Down Expand Up @@ -264,13 +277,33 @@ func getPersistentVolumeSpec(volumeName string, volumeID string, capacity int64,

// getPersistentVolumeClaimSpec return the PersistentVolumeClaim spec with
// specified storage class.
func getPersistentVolumeClaimSpec(name string, namespace string, capacity int64,
storageClassName string, accessMode v1.PersistentVolumeAccessMode, pvName string) *v1.PersistentVolumeClaim {
func getPersistentVolumeClaimSpec(ctx context.Context, name string, namespace string, capacity int64,
storageClassName string, accessMode v1.PersistentVolumeAccessMode, pvName string,
datastoreAccessibleTopology []map[string]string) (*v1.PersistentVolumeClaim, error) {

log := logger.GetLogger(ctx)
capacityInMb := strconv.FormatInt(capacity, 10) + "Mi"
var (
segmentsArray []string
topoAnnotation = make(map[string]string)
)
if datastoreAccessibleTopology != nil {
for _, topologyTerm := range datastoreAccessibleTopology {
jsonSegment, err := json.Marshal(topologyTerm)
if err != nil {
return nil, logger.LogNewErrorf(log,
"failed to marshal topology segment: %+v to json. Error: %+v", topologyTerm, err)
}
segmentsArray = append(segmentsArray, string(jsonSegment))
}
topoAnnotation[common.AnnVolumeAccessibleTopology] = "[" + strings.Join(segmentsArray, ",") + "]"
}

claim := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Name: name,
Namespace: namespace,
Annotations: topoAnnotation,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
Expand All @@ -285,7 +318,7 @@ func getPersistentVolumeClaimSpec(name string, namespace string, capacity int64,
VolumeName: pvName,
},
}
return claim
return claim, nil
}

// isPVCBound return true if the PVC is bound before timeout.
Expand Down

0 comments on commit e03934b

Please sign in to comment.