diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index dbef2b4..8cf8765 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -490,7 +490,7 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi return nil, status.Error(codes.OutOfRange, "Volume size exceeds the limit specified") } - volume, err := cs.connector.GetVolumeByID(ctx, volumeID) + _, err := cs.connector.GetVolumeByID(ctx, volumeID) if err != nil { if errors.Is(err, cloud.ErrNotFound) { return nil, status.Errorf(codes.NotFound, "Volume %v not found", volumeID) @@ -499,19 +499,6 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi return nil, status.Error(codes.Internal, fmt.Sprintf("GetVolume failed with error %v", err)) } - if volume.Size >= util.GigaBytesToBytes(volSizeGB) { - // A volume was already resized. - logger.Info("Volume has already been expanded", - "volumeID", volumeID, - "volumeSize", volume.Size, - "volumeSizeRequested", volSizeGB) - - return &csi.ControllerExpandVolumeResponse{ - CapacityBytes: volume.Size, - NodeExpansionRequired: true, - }, nil - } - // lock out volumeID for clone and delete operation if err := cs.operationLocks.GetExpandLock(volumeID); err != nil { logger.Error(err, "failed acquiring expand lock", "volumeID", volumeID) @@ -530,15 +517,22 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi "volumeSize", volSizeGB, ) + nodeExpansionRequired := true + // Node expansion is not required for raw block volumes. + volCap := req.GetVolumeCapability() + if volCap != nil && volCap.GetBlock() != nil { + nodeExpansionRequired = false + } + return &csi.ControllerExpandVolumeResponse{ CapacityBytes: util.GigaBytesToBytes(volSizeGB), - NodeExpansionRequired: true, + NodeExpansionRequired: nodeExpansionRequired, }, nil } func (cs *controllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { logger := klog.FromContext(ctx) - logger.V(6).Info("ControllerExpandVolume: called", "args", protosanitizer.StripSecrets(*req)) + logger.V(6).Info("ControllerGetCapabilities: called", "args", protosanitizer.StripSecrets(*req)) resp := &csi.ControllerGetCapabilitiesResponse{ Capabilities: []*csi.ControllerServiceCapability{ diff --git a/pkg/driver/node.go b/pkg/driver/node.go index d71801f..6050af8 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -22,6 +23,13 @@ const ( defaultFsType = FSTypeExt4 ) +var ValidFSTypes = map[string]struct{}{ + FSTypeExt2: {}, + FSTypeExt3: {}, + FSTypeExt4: {}, + FSTypeXfs: {}, +} + type nodeServer struct { csi.UnimplementedNodeServer connector cloud.Interface @@ -70,6 +78,33 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } + // If the access type is block, do nothing for stage + if blk := volCap.GetBlock(); blk != nil { + return &csi.NodeStageVolumeResponse{}, nil + } + + mnt := volCap.GetMount() + if mnt == nil { + return nil, status.Error(codes.InvalidArgument, "NodeStageVolume: mount volume capability not found") + } + + fsType := mnt.GetFsType() + if fsType == "" { + fsType = defaultFsType + } + + _, ok := ValidFSTypes[strings.ToLower(fsType)] + if !ok { + return nil, status.Errorf(codes.InvalidArgument, "NodeStageVolume: invalid fstype %s", fsType) + } + + var mountOptions []string + for _, f := range mnt.GetMountFlags() { + if !hasMountOption(mountOptions, f) { + mountOptions = append(mountOptions, f) + } + } + if acquired := ns.volumeLocks.TryAcquire(volumeID); !acquired { logger.Error(errors.New(util.ErrVolumeOperationAlreadyExistsVolumeID), "failed to acquire volume lock", "volumeID", volumeID) @@ -78,64 +113,69 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol defer ns.volumeLocks.Release(volumeID) // Now, find the device path - - pubCtx := req.GetPublishContext() - deviceID := pubCtx[deviceIDContextKey] - - devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID) + source, err := ns.mounter.GetDevicePath(ctx, volumeID) if err != nil { return nil, status.Errorf(codes.Internal, "Cannot find device path for volume %s: %s", volumeID, err.Error()) } - logger.Info("Device found", - "devicePath", devicePath, - "deviceID", deviceID, + logger.V(4).Info("NodeStageVolume: device found", + "source", source, ) - // If the access type is block, do nothing for stage - if blk := volCap.GetBlock(); blk != nil { - return &csi.NodeStageVolumeResponse{}, nil - } + exists, err := ns.mounter.PathExists(target) + if err != nil { + msg := fmt.Sprintf("failed to check if target %q exists: %v", target, err) - // The access type should now be "Mount". - // We have to format the partition. + return nil, status.Error(codes.Internal, msg) + } + if !exists { + // If target path does not exist we need to create the directory where volume will be staged + logger.V(4).Info("NodeStageVolume: creating target dir", "target", target) + if err = ns.mounter.MakeDir(target); err != nil { + msg := fmt.Sprintf("could not create target dir %q: %v", target, err) - mnt := volCap.GetMount() - if mnt == nil { - return nil, status.Error(codes.InvalidArgument, "Neither block nor mount volume capability") + return nil, status.Error(codes.Internal, msg) + } } - // Verify whether mounted - notMnt, err := ns.mounter.IsLikelyNotMountPoint(target) + // Check if a device is mounted in target directory + device, _, err := ns.mounter.GetDeviceName(target) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + msg := fmt.Sprintf("failed to check if volume is already mounted: %v", err) + + return nil, status.Error(codes.Internal, msg) } - fsType := mnt.GetFsType() - if fsType == "" { - fsType = defaultFsType + // This operation (NodeStageVolume) MUST be idempotent. + // If the volume corresponding to the volume_id is already staged to the staging_target_path, + // and is identical to the specified volume_capability the Plugin MUST reply 0 OK. + logger.V(4).Info("NodeStageVolume: checking if volume is already staged", "device", device, "source", source, "target", target) + if device == source { + logger.V(4).Info("NodeStageVolume: volume already staged", "volumeID", volumeID) + + return &csi.NodeStageVolumeResponse{}, nil } - var mountOptions []string - for _, f := range mnt.GetMountFlags() { - if !hasMountOption(mountOptions, f) { - mountOptions = append(mountOptions, f) - } + logger.V(4).Info("NodeStageVolume: staging volume", "source", source, "volumeID", volumeID, "target", target, "fstype", fsType, "options", mountOptions) + err = ns.mounter.FormatAndMount(source, target, fsType, mountOptions) + if err != nil { + msg := fmt.Sprintf("could not format %q and mount it at %q: %v", source, target, err) + + return nil, status.Error(codes.Internal, msg) } - // Volume Mount - if notMnt { - logger.Info("NodeStageVolume: formatting and mounting", - "devicePath", devicePath, - "target", target, - "fsType", fsType, - "options", mountOptions, - ) - err = ns.mounter.FormatAndMount(devicePath, target, fsType, mountOptions) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + needResize, err := ns.mounter.NeedResize(source, target) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not determine if volume %q (%q) needs to be resized: %v", volumeID, source, err) + } + + if needResize { + logger.V(2).Info("NodeStageVolume: volume needs resizing", "source", source) + if _, err := ns.mounter.Resize(source, target); err != nil { + return nil, status.Errorf(codes.Internal, "could not resize volume %q (%q): %v", volumeID, source, err) } } + logger.V(4).Info("NodeStageVolume: successfully staged volume", "source", source, "volumeID", volumeID, "target", target, "fstype", fsType) return &csi.NodeStageVolumeResponse{}, nil } @@ -158,7 +198,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag logger.V(6).Info("NodeUnstageVolume: called", "args", *req) // Check parameters - volumeID := req.GetVolumeId() if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID not provided") @@ -176,125 +215,171 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag } defer ns.volumeLocks.Release(volumeID) - notMnt, err := ns.mounter.IsLikelyNotMountPoint(target) + // Check if target directory is a mount point. GetDeviceNameFromMount + // given a mnt point, finds the device from /proc/mounts + // returns the device name, reference count, and error code + dev, refCount, err := ns.mounter.GetDeviceName(target) if err != nil { - if os.IsNotExist(err) { - return nil, status.Error(codes.NotFound, "Target path not found") - } + msg := fmt.Sprintf("failed to check if target %q is a mount point: %v", target, err) - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, msg) } - if notMnt { + + // From the spec: If the volume corresponding to the volume_id + // is not staged to the staging_target_path, the Plugin MUST + // reply 0 OK. + if refCount == 0 { + logger.V(4).Info("NodeUnstageVolume: target not mounted", "target", target) + return &csi.NodeUnstageVolumeResponse{}, nil } - logger.Info("NodeUnstageVolume: unmounting", - "target", target, - ) + if refCount > 1 { + logger.V(4).Info("NodeUnstageVolume: found references to device mounted at target path", "refCount", refCount, "device", dev, "target", target) + } + + logger.V(4).Info("NodeUnstageVolume: unmounting", "target", target) - err = ns.mounter.CleanupMountPoint(target, true) + err = ns.mounter.Unstage(target) if err != nil { return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", target, err) } - logger.Info("NodeUnstageVolume: unmount successful", + logger.V(4).Info("NodeUnstageVolume: unmount successful", "target", target, + "volumeID", volumeID, ) return &csi.NodeUnstageVolumeResponse{}, nil } -func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { //nolint:gocognit +func (ns *nodeServer) isMounted(ctx context.Context, target string) (bool, error) { + logger := klog.FromContext(ctx) + + notMnt, err := ns.mounter.IsLikelyNotMountPoint(target) + if err != nil { + if os.IsNotExist(err) { + return false, err + } + + // Checking if the path exists and error is related to Corrupted Mount, in that case, the system could unmount and mount. + _, pathErr := ns.mounter.PathExists(target) + if pathErr != nil && ns.mounter.IsCorruptedMnt(pathErr) { + logger.V(4).Info("NodePublishVolume: Target path is a corrupted mount. Trying to unmount.", "target", target) + if mntErr := ns.mounter.Unpublish(target); mntErr != nil { + return false, fmt.Errorf("unable to unmount the target %q : %w", target, mntErr) + } + + // After successful unmount, the device is ready to be mounted. + return false, nil + } + + return false, fmt.Errorf("could not check if %q is a mount point: %w, %w", target, err, pathErr) + } + + return !notMnt, nil +} + +func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { //nolint:gocyclo,gocognit logger := klog.FromContext(ctx) logger.V(6).Info("NodePublishVolume: called", "args", *req) // Check arguments - if req.GetVolumeCapability() == nil { - return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") - } - if req.GetVolumeId() == "" { + volumeID := req.GetVolumeId() + if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - volumeID := req.GetVolumeId() - if req.GetTargetPath() == "" { - return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + source := req.GetStagingTargetPath() + if source == "" { + return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") } - targetPath := req.GetTargetPath() - if req.GetVolumeCapability().GetBlock() != nil && - req.GetVolumeCapability().GetMount() != nil { - return nil, status.Error(codes.InvalidArgument, "Cannot have both block and mount access type") + target := req.GetTargetPath() + if target == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } - if req.GetStagingTargetPath() == "" { - return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") + + volCap := req.GetVolumeCapability() + if volCap == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") } - readOnly := req.GetReadonly() - options := []string{"bind"} - if readOnly { - options = append(options, "ro") + if !isValidVolumeCapabilities([]*csi.VolumeCapability{volCap}) { + return nil, status.Error(codes.InvalidArgument, "Volume capability not supported") } - deviceID := "" - if req.GetPublishContext() != nil { - deviceID = req.GetPublishContext()[deviceIDContextKey] + mountOptions := []string{"bind"} + if req.GetReadonly() { + mountOptions = append(mountOptions, "ro") } // Considering kubelet ensures the stage and publish operations // are serialized, we don't need any extra locking in NodePublishVolume. - if req.GetVolumeCapability().GetMount() != nil { //nolint:nestif - source := req.GetStagingTargetPath() - - notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) + switch req.GetVolumeCapability().GetAccessType().(type) { + case *csi.VolumeCapability_Mount: + mounted, err := ns.isMounted(ctx, target) if err != nil { if os.IsNotExist(err) { - if err := ns.mounter.MakeDir(targetPath); err != nil { - return nil, status.Errorf(codes.Internal, "Could not create dir %q: %v", targetPath, err) + if err := ns.mounter.MakeDir(target); err != nil { + return nil, status.Errorf(codes.Internal, "Could not create dir %q: %v", target, err) } } else { - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Errorf(codes.Internal, "Could not check if %q is mounted: %v", target, err) } } - if !notMnt { + + if mounted { logger.Info("NodePublishVolume: volume is already mounted", "source", source, - "targetPath", targetPath, + "target", target, ) return &csi.NodePublishVolumeResponse{}, nil } - fsType := req.GetVolumeCapability().GetMount().GetFsType() + mnt := volCap.GetMount() + if mnt == nil { + return nil, status.Error(codes.InvalidArgument, "NodePublishVolume: mount volume capability not found") + } + if mnt := volCap.GetMount(); mnt != nil { + for _, f := range mnt.GetMountFlags() { + if !hasMountOption(mountOptions, f) { + mountOptions = append(mountOptions, f) + } + } + } - mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags() + fsType := mnt.GetFsType() + if fsType == "" { + fsType = defaultFsType + } - logger.Info("NodePublishVolume: mounting source", + _, ok := ValidFSTypes[strings.ToLower(fsType)] + if !ok { + return nil, status.Errorf(codes.InvalidArgument, "NodePublishVolume: invalid fstype %s", fsType) + } + + logger.V(4).Info("NodePublishVolume: mounting source", "source", source, - "targetPath", targetPath, + "target", target, "fsType", fsType, - "deviceID", deviceID, - "readOnly", readOnly, + "mountOptions", mountOptions, "volumeID", volumeID, - "mountFlags", mountFlags, ) - if err := ns.mounter.Mount(source, targetPath, fsType, options); err != nil { - return nil, status.Errorf(codes.Internal, "failed to mount %s at %s: %s", source, targetPath, err.Error()) + if err := ns.mounter.Mount(source, target, fsType, mountOptions); err != nil { + return nil, status.Errorf(codes.Internal, "failed to mount %q at %q: %v", source, target, err) } - } - - if req.GetVolumeCapability().GetBlock() != nil { //nolint:nestif - volumeID := req.GetVolumeId() - - devicePath, err := ns.mounter.GetDevicePath(ctx, volumeID) + case *csi.VolumeCapability_Block: + source, err := ns.mounter.GetDevicePath(ctx, volumeID) if err != nil { - return nil, status.Errorf(codes.Internal, "Cannot find device path for volume %s: %s", volumeID, err.Error()) + return nil, status.Errorf(codes.Internal, "Cannot find device path for volume %s: %v", volumeID, err) } - globalMountPath := filepath.Dir(targetPath) - exists, err := ns.mounter.ExistsPath(globalMountPath) + globalMountPath := filepath.Dir(target) + exists, err := ns.mounter.PathExists(globalMountPath) if err != nil { return nil, status.Errorf(codes.Internal, "Could not check if path exists %q: %v", globalMountPath, err) } @@ -304,25 +389,45 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } } - err = ns.mounter.MakeFile(targetPath) - if err != nil { - if removeErr := os.Remove(targetPath); removeErr != nil { - return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", targetPath, removeErr) + mounted, err := ns.isMounted(ctx, target) + if err != nil { //nolint:nestif + if os.IsNotExist(err) { + // Create the mount point as a file since bind mount device node requires it to be a file + logger.V(4).Info("NodePublishVolume: making target file", "target", target) + err = ns.mounter.MakeFile(target) + if err != nil { + if removeErr := os.Remove(target); removeErr != nil { + return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr) + } + + return nil, status.Errorf(codes.Internal, "Could not create file %q: %v", target, err) + } + } else { + return nil, status.Errorf(codes.Internal, "Could not check if %q is mounted: %v", target, err) } + } - return nil, status.Errorf(codes.Internal, "Could not create file %q: %v", targetPath, err) + if mounted { + logger.Info("NodePublishVolume: volume is already mounted", + "source", source, + "target", target, + ) + + return &csi.NodePublishVolumeResponse{}, nil } logger.Info("NodePublishVolume: mounting device", - "devicePath", devicePath, - "targetPath", targetPath, - "deviceID", deviceID, - "readOnly", readOnly, + "source", source, + "target", target, "volumeID", volumeID, ) - if err := ns.mounter.Mount(devicePath, targetPath, "", options); err != nil { - return nil, status.Errorf(codes.Internal, "failed to mount %s at %s: %s", devicePath, targetPath, err.Error()) + if err := ns.mounter.Mount(source, target, "", mountOptions); err != nil { + if removeErr := os.Remove(target); removeErr != nil { + return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr) + } + + return nil, status.Errorf(codes.Internal, "failed to mount %q at %q: %v", source, target, err) } } @@ -337,36 +442,24 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") } - targetPath := req.GetTargetPath() - if targetPath == "" { + target := req.GetTargetPath() + if target == "" { return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } // Considering that kubelet ensures the stage and publish operations // are serialized, we don't need any extra locking in NodeUnpublishVolume. - if _, err := ns.connector.GetVolumeByID(ctx, volumeID); errors.Is(err, cloud.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Volume %v not found", volumeID) - } else if err != nil { - // Error with CloudStack - return nil, status.Errorf(codes.Internal, "Error %v", err) - } - - logger.Info("NodeUnpublishVolume: unmounting volume", - "targetPath", targetPath, + logger.V(4).Info("NodeUnpublishVolume: unmounting volume", + "target", target, "volumeID", volumeID, ) - err := ns.mounter.CleanupMountPoint(targetPath, true) + err := ns.mounter.Unpublish(target) if err != nil { - return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err) + return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", target, err) } - logger.Info("NodeUnpublishVolume: unmounting successful", - "targetPath", targetPath, - "volumeID", volumeID, - ) - return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -419,13 +512,34 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV } volCap := req.GetVolumeCapability() - if volCap != nil { + if volCap != nil { //nolint:nestif + caps := []*csi.VolumeCapability{volCap} + if !isValidVolumeCapabilities(caps) { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("VolumeCapability is invalid: %v", volCap)) + } + switch volCap.GetAccessType().(type) { //nolint:gocritic case *csi.VolumeCapability_Block: logger.Info("Filesystem expansion is skipped for block volumes") return &csi.NodeExpandVolumeResponse{}, nil } + } else { + // VolumeCapability is nil, check if volumePath point to a block device + isBlock, err := ns.mounter.IsBlockDevice(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to determine if volumePath [%v] is a block device: %v", volumePath, err) + } + if isBlock { + // Skip resizing for Block NodeExpandVolume + bcap, err := ns.mounter.GetBlockSizeBytes(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.GetVolumePath(), err) + } + logger.V(4).Info("NodeExpandVolume: called, since given volumePath is a block device, ignoring...", "volumeID", volumeID, "volumePath", volumePath) + + return &csi.NodeExpandVolumeResponse{CapacityBytes: bcap}, nil + } } if acquired := ns.volumeLocks.TryAcquire(volumeID); !acquired { @@ -455,12 +569,87 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV "volumePath", volumePath, ) - r := ns.mounter.NewResizeFs(mount.New()) - if _, err := r.Resize(devicePath, volumePath); err != nil { - return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err) + if _, err := ns.mounter.Resize(devicePath, volumePath); err != nil { + return nil, status.Errorf(codes.Internal, "Could not resize volume %q (%q): %v", volumeID, devicePath, err) + } + + bcap, err := ns.mounter.GetBlockSizeBytes(devicePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.GetVolumePath(), err) } - return &csi.NodeExpandVolumeResponse{}, nil + return &csi.NodeExpandVolumeResponse{CapacityBytes: bcap}, nil +} + +func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + logger := klog.FromContext(ctx) + logger.V(6).Info("NodeGetVolumeStats: called", "args", *req) + + if req.GetVolumeId() == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + + // Get volume path + // This should work for Kubernetes >= 1.26, see https://github.com/kubernetes/kubernetes/issues/115343 + volumePath := req.GetStagingTargetPath() + if volumePath == "" { + // Except that it doesn't work in the sanity test, so we need a fallback to volumePath. + volumePath = req.GetVolumePath() + } + if len(volumePath) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume path not provided") + } + + exists, err := ns.mounter.PathExists(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "unknown error when stat on %s: %v", volumePath, err) + } + if !exists { + return nil, status.Errorf(codes.NotFound, "path %s does not exist", volumePath) + } + + isBlock, err := ns.mounter.IsBlockDevice(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to determine if %q is block device: %s", volumePath, err) + } + + if isBlock { + bcap, blockErr := ns.mounter.GetBlockSizeBytes(req.GetVolumePath()) + if blockErr != nil { + return nil, status.Errorf(codes.Internal, "failed to get block capacity on path %s: %v", req.GetVolumePath(), blockErr) + } + + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Unit: csi.VolumeUsage_BYTES, + Total: bcap, + }, + }, + }, nil + } + + stats, err := ns.mounter.GetStatistics(volumePath) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to retrieve capacity statistics for volume path %q: %s", volumePath, err) + } + + return &csi.NodeGetVolumeStatsResponse{ + Usage: []*csi.VolumeUsage{ + { + Available: stats.AvailableBytes, + Total: stats.TotalBytes, + Used: stats.UsedBytes, + Unit: csi.VolumeUsage_BYTES, + }, + { + Available: stats.AvailableInodes, + Total: stats.TotalInodes, + Used: stats.UsedInodes, + Unit: csi.VolumeUsage_INODES, + }, + }, + }, nil } func (ns *nodeServer) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { @@ -480,6 +669,13 @@ func (ns *nodeServer) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapab }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + }, + }, + }, }, } diff --git a/pkg/driver/topology.go b/pkg/driver/topology.go index 0cedfb2..bd6bfa7 100644 --- a/pkg/driver/topology.go +++ b/pkg/driver/topology.go @@ -33,7 +33,7 @@ func (t Topology) ToCSI() *csi.Topology { segments := make(map[string]string) segments[ZoneKey] = t.ZoneID if t.HostID != "" { - segments[ZoneKey] = t.ZoneID + segments[HostKey] = t.HostID } return &csi.Topology{ diff --git a/pkg/mount/fake.go b/pkg/mount/fake.go index 0dd9c78..8129033 100644 --- a/pkg/mount/fake.go +++ b/pkg/mount/fake.go @@ -5,13 +5,15 @@ import ( "os" "k8s.io/mount-utils" - utilsexec "k8s.io/utils/exec" exec "k8s.io/utils/exec/testing" ) +const ( + giB = 1 << 30 +) + type fakeMounter struct { mount.SafeFormatAndMount - utilsexec.Interface } // NewFake creates a fake implementation of the @@ -22,12 +24,11 @@ func NewFake() Interface { Interface: mount.NewFakeMounter([]mount.MountPoint{}), Exec: &exec.FakeExec{DisableScripts: true}, }, - utilsexec.New(), } } -func (m *fakeMounter) CleanupMountPoint(path string, extensiveCheck bool) error { - return mount.CleanupMountPoint(path, m, extensiveCheck) +func (m *fakeMounter) GetBlockSizeBytes(_ string) (int64, error) { + return 1073741824, nil } func (m *fakeMounter) GetDevicePath(_ context.Context, _ string) (string, error) { @@ -38,7 +39,13 @@ func (m *fakeMounter) GetDeviceName(mountPath string) (string, int, error) { return mount.GetDeviceNameFromMount(m, mountPath) } -func (*fakeMounter) ExistsPath(_ string) (bool, error) { +func (*fakeMounter) PathExists(path string) (bool, error) { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil } @@ -53,10 +60,52 @@ func (*fakeMounter) MakeDir(pathname string) error { return nil } -func (*fakeMounter) MakeFile(_ string) error { +func (*fakeMounter) MakeFile(pathname string) error { + file, err := os.OpenFile(pathname, os.O_CREATE, os.FileMode(0o644)) + if err != nil { + if !os.IsExist(err) { + return err + } + } + if err = file.Close(); err != nil { + return err + } + return nil } -func (*fakeMounter) NewResizeFs(_ utilsexec.Interface) *mount.ResizeFs { - return mount.NewResizeFs(New()) +func (m *fakeMounter) GetStatistics(_ string) (volumeStatistics, error) { + return volumeStatistics{ + AvailableBytes: 3 * giB, + TotalBytes: 10 * giB, + UsedBytes: 7 * giB, + + AvailableInodes: 3000, + TotalInodes: 10000, + UsedInodes: 7000, + }, nil +} + +func (m *fakeMounter) IsBlockDevice(_ string) (bool, error) { + return false, nil +} + +func (m *fakeMounter) IsCorruptedMnt(_ error) bool { + return false +} + +func (m *fakeMounter) NeedResize(_ string, _ string) (bool, error) { + return false, nil +} + +func (m *fakeMounter) Resize(_ string, _ string) (bool, error) { + return true, nil +} + +func (m *fakeMounter) Unpublish(path string) error { + return m.Unstage(path) +} + +func (m *fakeMounter) Unstage(path string) error { + return mount.CleanupMountPoint(path, m, true) } diff --git a/pkg/mount/mount.go b/pkg/mount/mount.go index 63aa90e..6e45aaf 100644 --- a/pkg/mount/mount.go +++ b/pkg/mount/mount.go @@ -6,14 +6,17 @@ import ( "context" "fmt" "os" + "os/exec" "path/filepath" + "strconv" "strings" "time" + "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" - "k8s.io/utils/exec" + kexec "k8s.io/utils/exec" ) const ( @@ -22,39 +25,57 @@ const ( // Interface defines the set of methods to allow for // mount operations on a system. -type Interface interface { +type Interface interface { //nolint:interfacebloat mount.Interface - exec.Interface FormatAndMount(source string, target string, fstype string, options []string) error - - CleanupMountPoint(path string, extensiveCheck bool) error + GetBlockSizeBytes(devicePath string) (int64, error) GetDevicePath(ctx context.Context, volumeID string) (string, error) GetDeviceName(mountPath string) (string, int, error) - ExistsPath(filename string) (bool, error) + GetStatistics(volumePath string) (volumeStatistics, error) + IsBlockDevice(devicePath string) (bool, error) + IsCorruptedMnt(err error) bool MakeDir(pathname string) error MakeFile(pathname string) error - NewResizeFs(exec exec.Interface) *mount.ResizeFs + NeedResize(devicePath string, deviceMountPath string) (bool, error) + PathExists(path string) (bool, error) + Resize(devicePath, deviceMountPath string) (bool, error) + Unpublish(path string) error + Unstage(path string) error } type mounter struct { - mount.SafeFormatAndMount - exec.Interface + *mount.SafeFormatAndMount +} + +type volumeStatistics struct { + AvailableBytes, TotalBytes, UsedBytes int64 + AvailableInodes, TotalInodes, UsedInodes int64 } // New creates an implementation of the mount.Interface. func New() Interface { return &mounter{ - mount.SafeFormatAndMount{ + &mount.SafeFormatAndMount{ Interface: mount.New(""), - Exec: exec.New(), + Exec: kexec.New(), }, - exec.New(), } } -func (m *mounter) CleanupMountPoint(path string, extensiveCheck bool) error { - return mount.CleanupMountPoint(path, m, extensiveCheck) +// GetBlockSizeBytes gets the size of the disk in bytes. +func (m *mounter) GetBlockSizeBytes(devicePath string) (int64, error) { + output, err := m.Exec.Command("blockdev", "--getsize64", devicePath).Output() + if err != nil { + return -1, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %w", devicePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse size %s as int", strOut) + } + + return gotSizeBytes, nil } func (m *mounter) GetDevicePath(ctx context.Context, volumeID string) (string, error) { @@ -150,14 +171,8 @@ func diskUUIDToSerial(uuid string) string { return uuidWithoutHyphen[:20] } -func (*mounter) ExistsPath(filename string) (bool, error) { - if _, err := os.Stat(filename); os.IsNotExist(err) { - return false, nil - } else if err != nil { - return false, err - } - - return true, nil +func (*mounter) PathExists(path string) (bool, error) { + return mount.PathExists(path) } func (*mounter) MakeDir(pathname string) error { @@ -185,6 +200,82 @@ func (*mounter) MakeFile(pathname string) error { return nil } -func (*mounter) NewResizeFs(_ exec.Interface) *mount.ResizeFs { - return mount.NewResizeFs(New()) +// Resize resizes the filesystem of the given devicePath. +func (m *mounter) Resize(devicePath, deviceMountPath string) (bool, error) { + return mount.NewResizeFs(m.Exec).Resize(devicePath, deviceMountPath) +} + +// NeedResize checks if the filesystem of the given devicePath needs to be resized. +func (m *mounter) NeedResize(devicePath string, deviceMountPath string) (bool, error) { + return mount.NewResizeFs(m.Exec).NeedResize(devicePath, deviceMountPath) +} + +// GetStatistics gathers statistics on the volume. +func (m *mounter) GetStatistics(volumePath string) (volumeStatistics, error) { + isBlock, err := m.IsBlockDevice(volumePath) + if err != nil { + return volumeStatistics{}, fmt.Errorf("failed to determine if volume %s is block device: %w", volumePath, err) + } + + if isBlock { + // See http://man7.org/linux/man-pages/man8/blockdev.8.html for details + output, err := exec.Command("blockdev", "getsize64", volumePath).CombinedOutput() + if err != nil { + return volumeStatistics{}, fmt.Errorf("error when getting size of block volume at path %s: output: %s, err: %w", volumePath, string(output), err) + } + strOut := strings.TrimSpace(string(output)) + gotSizeBytes, err := strconv.ParseInt(strOut, 10, 64) + if err != nil { + return volumeStatistics{}, fmt.Errorf("failed to parse size %s into int", strOut) + } + + return volumeStatistics{ + TotalBytes: gotSizeBytes, + }, nil + } + + var statfs unix.Statfs_t + // See http://man7.org/linux/man-pages/man2/statfs.2.html for details. + err = unix.Statfs(volumePath, &statfs) + if err != nil { + return volumeStatistics{}, err + } + + volStats := volumeStatistics{ + AvailableBytes: int64(statfs.Bavail) * int64(statfs.Bsize), //nolint:unconvert + TotalBytes: int64(statfs.Blocks) * int64(statfs.Bsize), //nolint:unconvert + UsedBytes: (int64(statfs.Blocks) - int64(statfs.Bfree)) * int64(statfs.Bsize), //nolint:unconvert + + AvailableInodes: int64(statfs.Ffree), + TotalInodes: int64(statfs.Files), + UsedInodes: int64(statfs.Files) - int64(statfs.Ffree), + } + + return volStats, nil +} + +// IsBlockDevice checks if the given path is a block device. +func (m *mounter) IsBlockDevice(devicePath string) (bool, error) { + var stat unix.Stat_t + err := unix.Stat(devicePath, &stat) + if err != nil { + return false, err + } + + return (stat.Mode & unix.S_IFMT) == unix.S_IFBLK, nil +} + +// IsCorruptedMnt return true if err is about corrupted mount point. +func (m *mounter) IsCorruptedMnt(err error) bool { + return mount.IsCorruptedMnt(err) +} + +// Unpublish unmounts the given path. +func (m *mounter) Unpublish(path string) error { + return m.Unstage(path) +} + +// Unstage unmounts the given path. +func (m *mounter) Unstage(path string) error { + return mount.CleanupMountPoint(path, m, true) } diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index ee1f4b8..2a6780b 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -35,6 +35,8 @@ func TestSanity(t *testing.T) { config.TestVolumeParameters = map[string]string{ driver.DiskOfferingKey: "9743fd77-0f5d-4ef9-b2f8-f194235c769c", } + config.IdempotentCount = 5 + config.TestNodeVolumeAttachLimit = true logger := klog.Background() ctx := klog.NewContext(context.Background(), logger)