Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi): add support for ReadWriteOncePod access mode #3236

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions csi/controller_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,280 +69,285 @@
accessModes: getVolumeCapabilityAccessModes(
[]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
}),
log: logrus.StandardLogger().WithField("component", "csi-controller-server"),
}
}

func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
log := cs.log.WithFields(logrus.Fields{"function": "CreateVolume"})

log.Infof("CreateVolume is called with req %+v", req)

volumeID := util.AutoCorrectName(req.GetName(), datastore.NameMaximumLength)
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}
volumeCaps := req.GetVolumeCapabilities()
if err := cs.validateVolumeCapabilities(volumeCaps); err != nil {
return nil, err
}
volumeParameters := req.GetParameters()
if volumeParameters == nil {
volumeParameters = map[string]string{}
}
var reqVolSizeBytes int64
if req.GetCapacityRange() != nil {
reqVolSizeBytes = req.GetCapacityRange().GetRequiredBytes()
}
if reqVolSizeBytes < util.MinimalVolumeSize {
log.Infof("Volume %s requested capacity %v is smaller than minimal capacity %v, enforcing minimal capacity.", volumeID, reqVolSizeBytes, util.MinimalVolumeSize)
reqVolSizeBytes = util.MinimalVolumeSize
}
// Round up to multiple of 2 * 1024 * 1024
reqVolSizeBytes = util.RoundUpSize(reqVolSizeBytes)

volumeSource := req.GetVolumeContentSource()
if volumeSource != nil {
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
if snapshot := volumeSource.GetSnapshot(); snapshot != nil {
csiSnapshotType, sourceVolumeName, id := decodeSnapshotID(snapshot.SnapshotId)
switch csiSnapshotType {
case csiSnapshotTypeLonghornBackingImage:
backingImageParameters := decodeSnapshoBackingImageID(snapshot.SnapshotId)
if backingImageParameters[longhorn.BackingImageParameterName] == "" || backingImageParameters[longhorn.BackingImageParameterDataSourceType] == "" {
return nil, status.Errorf(codes.InvalidArgument, "invalid CSI snapshotHandle %v for backing image", snapshot.SnapshotId)
}
updateVolumeParamsForBackingImage(volumeParameters, backingImageParameters)
case csiSnapshotTypeLonghornSnapshot:
if id == "" {
return nil, status.Errorf(codes.NotFound, "volume source snapshot %v is not found", snapshot.SnapshotId)
}
dataSource, _ := types.NewVolumeDataSource(longhorn.VolumeDataSourceTypeSnapshot, map[string]string{types.VolumeNameKey: sourceVolumeName, types.SnapshotNameKey: id})
volumeParameters["dataSource"] = string(dataSource)
case csiSnapshotTypeLonghornBackup:
if id == "" {
return nil, status.Errorf(codes.NotFound, "volume source snapshot %v is not found", snapshot.SnapshotId)
}
backupVolume, backupName := sourceVolumeName, id
bv, err := cs.apiClient.BackupVolume.ById(backupVolume)
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to restore CSI snapshot %s backup volume %s unavailable", snapshot.SnapshotId, backupVolume)
}

backup, err := cs.apiClient.BackupVolume.ActionBackupGet(bv, &longhornclient.BackupInput{Name: backupName})
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to restore CSI snapshot %v backup %s unavailable", snapshot.SnapshotId, backupName)
}

// use the fromBackup method for the csi snapshot restores as well
// the same parameter was previously only used for restores based on the storage class
volumeParameters["fromBackup"] = backup.Url
default:
return nil, status.Errorf(codes.InvalidArgument, "invalid CSI snapshot type: %v. Must be %v, %v or %v",
csiSnapshotType, csiSnapshotTypeLonghornSnapshot, csiSnapshotTypeLonghornBackup, csiSnapshotTypeLonghornBackingImage)
}
}
case *csi.VolumeContentSource_Volume:
if srcVolume := volumeSource.GetVolume(); srcVolume != nil {
longhornSrcVol, err := cs.apiClient.Volume.ById(srcVolume.VolumeId)
if err != nil {
return nil, status.Errorf(codes.NotFound, "failed to clone volume: source volume %s is unavailable", srcVolume.VolumeId)
}
if longhornSrcVol == nil {
return nil, status.Errorf(codes.NotFound, "failed to clone volume: source volume %s is not found", srcVolume.VolumeId)
}

// check size of source and requested
srcVolSizeBytes, err := strconv.ParseInt(longhornSrcVol.Size, 10, 64)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
if reqVolSizeBytes != srcVolSizeBytes {
return nil, status.Errorf(codes.OutOfRange, "failed to clone volume: the requested size (%v bytes) is different than the source volume size (%v bytes)", reqVolSizeBytes, srcVolSizeBytes)
}

dataSource, _ := types.NewVolumeDataSource(longhorn.VolumeDataSourceTypeVolume, map[string]string{types.VolumeNameKey: srcVolume.VolumeId})
volumeParameters["dataSource"] = string(dataSource)
}
default:
return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource)
}
} else {
// Refuse to create a NEW XFS volume smaller than 300 MiB, since mkfs.xfs will eventually fail in the node
// server. Don't refuse for clones/restores though, as they may have an existing filesystem.
for _, cap := range req.VolumeCapabilities {
if cap.GetMount().GetFsType() == "xfs" && reqVolSizeBytes < util.MinimalVolumeSizeXFS {
return nil, fmt.Errorf("XFS filesystems with size %d, smaller than %d, are not supported",
reqVolSizeBytes, util.MinimalVolumeSizeXFS)
}
}
}

existVol, err := cs.apiClient.Volume.ById(volumeID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if existVol != nil {
log.Infof("Volume %v already exists with name %v", volumeID, existVol.Name)

exVolSize, err := util.ConvertSize(existVol.Size)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

if exVolSize != reqVolSizeBytes {
return nil, status.Errorf(codes.AlreadyExists, "volume %s size %v differs from requested size %v", existVol.Name, exVolSize, reqVolSizeBytes)
}

// pass through the volume content source in case this volume is in the process of being created.
// We won't wait for clone/restore to complete but return OK immediately here so that
// if Kubernetes wants to abort/delete the cloning/restoring volume, it has the volume ID and is able to do so.
// We will wait for clone/restore to complete inside ControllerPublishVolume.
rsp := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: existVol.Id,
CapacityBytes: exVolSize,
VolumeContext: volumeParameters,
ContentSource: volumeSource,
},
}

return rsp, nil
}

// regardless of the used storage class, if this is requested in rwx mode
// we need to mark the volume as a shared volume
for _, cap := range volumeCaps {
if requiresSharedAccess(nil, cap) {
volumeParameters["share"] = "true"
break
}
if requireExclusiveAccess(nil, cap) {
volumeParameters["exclusive"] = "true"
break
}
}

vol, err := getVolumeOptions(volumeID, volumeParameters)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

if err = cs.checkAndPrepareBackingImage(volumeID, vol.BackingImage, volumeParameters); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

vol.Name = volumeID
vol.Size = fmt.Sprintf("%d", reqVolSizeBytes)

log.Infof("Creating a volume by API client, name: %s, size: %s, accessMode: %v, dataEngine: %v",
vol.Name, vol.Size, vol.AccessMode, vol.DataEngine)
resVol, err := cs.apiClient.Volume.Create(vol)
// TODO: implement error response code for Longhorn API to differentiate different error type.
// For example, creating a volume from a non-existing snapshot should return codes.NotFound instead of codes.Internal
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

checkVolumeCreated := func(vol *longhornclient.Volume) bool {
// This condition should be enough to return OK to the CSI CreateVolume request.
// The volume may be still downloading data from backup target or from a different volume,
// and we will wait for this process to finish before allowing the workload pod to use the volume
// in the ControllerPublishVolume call
return vol.State != "" && vol.State != string(longhorn.VolumeStateCreating)
}

if !cs.waitForVolumeState(resVol.Id, "volume created", checkVolumeCreated, true, false) {
return nil, status.Error(codes.DeadlineExceeded, "failed to wait for volume creation to complete")
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: resVol.Id,
CapacityBytes: reqVolSizeBytes,
VolumeContext: volumeParameters,
ContentSource: volumeSource,
},
}, nil
}

Check notice on line 272 in csi/controller_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/controller_server.go#L79-L272

Complex Method
func (cs *ControllerServer) checkAndPrepareBackingImage(volumeName, backingImageName string, volumeParameters map[string]string) error {
if backingImageName == "" {
return nil
}

bidsType := volumeParameters[longhorn.BackingImageParameterDataSourceType]
biChecksum := volumeParameters[longhorn.BackingImageParameterChecksum]
bidsParameters := map[string]string{}
if bidsParametersStr := volumeParameters[longhorn.BackingImageParameterDataSourceParameters]; bidsParametersStr != "" {
if err := json.Unmarshal([]byte(bidsParametersStr), &bidsParameters); err != nil {
return fmt.Errorf("volume %s is unable to create missing backing image with parameters %s: %v", volumeName, bidsParametersStr, err)
}
}

// There will be an empty BackingImage object rather than nil returned even if there is an error
existingBackingImage, err := cs.apiClient.BackingImage.ById(backingImageName)
if err != nil && !strings.Contains(err.Error(), "not found") {
return fmt.Errorf("volume %s is unable to retrieve backing image %s: %v", volumeName, backingImageName, err)
}
// A new backing image will be created automatically
// if there is no existing backing image with the name and the type is `download` or `export-from-volume`.
if existingBackingImage == nil || existingBackingImage.Name == "" {
switch longhorn.BackingImageDataSourceType(bidsType) {
case longhorn.BackingImageDataSourceTypeUpload:
return fmt.Errorf("volume %s backing image type %v is not supported via CSI", volumeName, bidsType)
case longhorn.BackingImageDataSourceTypeDownload:
if bidsParameters[longhorn.DataSourceTypeDownloadParameterURL] == "" {
return fmt.Errorf("volume %s missing parameters %v for preparing backing image",
volumeName, longhorn.DataSourceTypeDownloadParameterURL)
}
case longhorn.BackingImageDataSourceTypeExportFromVolume:
if bidsParameters[longhorn.DataSourceTypeExportParameterExportType] == "" || bidsParameters[longhorn.DataSourceTypeExportParameterVolumeName] == "" {
return fmt.Errorf("volume %s missing parameters %v or %v for preparing backing image",
volumeName, longhorn.DataSourceTypeExportParameterExportType, longhorn.DataSourceTypeExportParameterVolumeName)
}
default:
return fmt.Errorf("volume %s backing image type %v is not supported via CSI", volumeName, bidsType)
}

backingImage := &longhornclient.BackingImage{
Name: backingImageName,
ExpectedChecksum: biChecksum,
SourceType: bidsType,
Parameters: bidsParameters,
}

if minNumberOfCopies, ok := volumeParameters[longhorn.BackingImageParameterMinNumberOfCopies]; ok {
mnoc, err := strconv.Atoi(minNumberOfCopies)
if err != nil || mnoc < 0 {
return errors.Wrap(err, "invalid parameter minNumberOfCopies of backing image")
}
backingImage.MinNumberOfCopies = int64(mnoc)
}

if nodeSelector, ok := volumeParameters[longhorn.BackingImageParameterNodeSelector]; ok {
backingImage.NodeSelector = strings.Split(nodeSelector, ",")
}
if diskSelector, ok := volumeParameters[longhorn.BackingImageParameterDiskSelector]; ok {
backingImage.DiskSelector = strings.Split(diskSelector, ",")
}

_, err = cs.apiClient.BackingImage.Create(backingImage)
return err
}

if (bidsType != "" && bidsType != existingBackingImage.SourceType) || (len(bidsParameters) != 0 && !reflect.DeepEqual(existingBackingImage.Parameters, bidsParameters)) {
return fmt.Errorf("existing backing image %v data source is different from the parameters in the creation request or StorageClass", backingImageName)
}
if biChecksum != "" {
if (existingBackingImage.CurrentChecksum != "" && existingBackingImage.CurrentChecksum != biChecksum) ||
(existingBackingImage.ExpectedChecksum != "" && existingBackingImage.ExpectedChecksum != biChecksum) {
return fmt.Errorf("existing backing image %v expected checksum or current checksum doesn't match the specified checksum %v in the request", backingImageName, biChecksum)
}
}

return nil
}

Check notice on line 350 in csi/controller_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/controller_server.go#L273-L350

Complex Method
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
log := cs.log.WithFields(logrus.Fields{"function": "DeleteVolume"})

Expand Down Expand Up @@ -410,88 +415,95 @@
}

// ControllerPublishVolume will attach the volume to the specified node
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
log := cs.log.WithFields(logrus.Fields{"function": "ControllerPublishVolume"})

log.Infof("ControllerPublishVolume is called with req %+v", req)

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}

nodeID := req.GetNodeId()
if nodeID == "" {
return nil, status.Error(codes.InvalidArgument, "node id missing in request")
}

volumeCapability := req.GetVolumeCapability()
if volumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability missing in request")
}

// TODO: #1875 API returns error instead of not found, so we cannot differentiate between a retrieval failure and non existing resource
if _, err := cs.apiClient.Node.ById(nodeID); err != nil {
return nil, status.Errorf(codes.NotFound, "node %s not found", nodeID)
}

volume, err := cs.apiClient.Volume.ById(volumeID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if volume == nil {
return nil, status.Errorf(codes.NotFound, "volume %s not found", volumeID)
}

if volume.Frontend != string(longhorn.VolumeFrontendBlockDev) {
return nil, status.Errorf(codes.InvalidArgument, "volume %s invalid frontend type %s", volumeID, volume.Frontend)
}

if requiresSharedAccess(volume, volumeCapability) {
volume, err = cs.updateVolumeAccessMode(volume, longhorn.AccessModeReadWriteMany)
if err != nil {
return nil, err
}
}

if requireExclusiveAccess(volume, volumeCapability) {
volume, err = cs.updateVolumeAccessMode(volume, longhorn.AccessModeReadWriteOncePod)
if err != nil {
return nil, err
}
}

// TODO: JM Restore should be handled by the volume attach call, consider returning `codes.Aborted`
// TODO: JM should readiness be handled by the caller?
// Most of the readiness conditions are covered by the attach, except auto attachment which requires changes to the design
// should be handled by the processing of the api return codes
if !volume.Ready {
return nil, status.Errorf(codes.Aborted, "volume %s is not ready for workloads", volumeID)
}

attachmentID := generateAttachmentID(volumeID, nodeID)

return cs.publishVolume(volume, nodeID, attachmentID, func() error {
checkVolumePublished := func(vol *longhornclient.Volume) bool {
isRegularRWXVolume := vol.AccessMode == string(longhorn.AccessModeReadWriteMany) && !vol.Migratable
attachment, ok := vol.VolumeAttachment.Attachments[attachmentID]
if isRegularRWXVolume {
return ok && attachment.Satisfied
}
return ok && attachment.Satisfied && isVolumeAvailableOn(vol, nodeID)
}
if !cs.waitForVolumeState(volumeID, "volume published", checkVolumePublished, false, false) {
// check if there is error while attaching
if existVol, err := cs.apiClient.Volume.ById(volumeID); err == nil && existVol != nil {
if attachment, ok := existVol.VolumeAttachment.Attachments[attachmentID]; ok {
for _, condition := range attachment.Conditions {
if condition.Type == longhorn.AttachmentStatusConditionTypeSatisfied && condition.Status == string(longhorn.ConditionStatusFalse) && condition.Message != "" {
return status.Errorf(codes.Internal, "volume %v failed to attach to node %v with attachmentID %v: %v", volumeID, nodeID, attachmentID, condition.Message)
}
}
}
}
return status.Errorf(codes.DeadlineExceeded, "volume %v failed to attach to node %v with attachmentID %v", volumeID, nodeID, attachmentID)
}
return nil
})
}

// We pick the same name as the volume attachment object at
// https://github.com/kubernetes/kubernetes/blob/f1e74f77ff88abb7acf0fb0e86ba21bc0f2395c9/pkg/volume/csi/csi_attacher.go#L653-L656

Check notice on line 506 in csi/controller_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/controller_server.go#L418-L506

Complex Method
func generateAttachmentID(volName, nodeID string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, types.LonghornDriverName, nodeID)))
return fmt.Sprintf("csi-%x", result)
Expand Down Expand Up @@ -767,106 +779,106 @@
return createSnapshotResponseForSnapshotTypeLonghornSnapshot(vol.Name, snapshotID, snapshotCR), nil
}

func (cs *ControllerServer) createCSISnapshotTypeLonghornBackup(req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
log := cs.log.WithFields(logrus.Fields{"function": "createCSISnapshotTypeLonghornBackup"})

csiLabels := req.Parameters
csiSnapshotName := req.GetName()
csiVolumeName := req.GetSourceVolumeId()
if len(csiVolumeName) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume name must be provided")
} else if len(csiSnapshotName) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot name must be provided")
}

// We check for backup existence first, since it's possible that the actual volume is no longer available but the
// backup still is.
backup, err := cs.getBackup(csiVolumeName, csiSnapshotName)
if err != nil {
// Status code set in waitForBackupControllerSync.
return nil, err
}

if backup != nil {
// Per the CSI spec, if we are unable to complete the CreateSnapshot call successfully, we must return a non-ok
// gRPC code. In practice, doing so ensures we get requeued (and quickly deleted) when we hit
// https://github.com/kubernetes-csi/external-snapshotter/issues/880.
if backup.Error != "" {
return nil, status.Error(codes.Internal, backup.Error)
}

snapshotID := encodeSnapshotID(csiSnapshotTypeLonghornBackup, backup.VolumeName, backup.Name)
rsp := createSnapshotResponseForSnapshotTypeLonghornBackup(backup.VolumeName, snapshotID,
backup.SnapshotCreated, backup.VolumeSize, backup.State == string(longhorn.BackupStateCompleted))
return rsp, nil
}

existVol, err := cs.apiClient.Volume.ById(csiVolumeName)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if existVol == nil {
return nil, status.Errorf(codes.NotFound, "volume %s not found", csiVolumeName)
}

var snapshotCR *longhornclient.SnapshotCR
snapshotCRs, err := cs.apiClient.Volume.ActionSnapshotCRList(existVol)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
for _, snap := range snapshotCRs.Data {
if snap.Name == csiSnapshotName {
snapshotCR = &snap
break
}
}

// no existing backup and no local snapshot, create a new one
if snapshotCR == nil {
log.Infof("Creating Volume %s snapshot %s", existVol.Name, csiSnapshotName)
snapshotCR, err = cs.apiClient.Volume.ActionSnapshotCRCreate(existVol, &longhornclient.SnapshotCRInput{
Labels: csiLabels,
Name: csiSnapshotName,
})
// failed to create snapshot, so there is no way to backup
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

// wait for the snapshot creation to be fully finished
snapshotCR, err = cs.waitForSnapshotToBeReady(snapshotCR.Name, existVol.Name)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// create backup based on local volume snapshot
log.Infof("Creating volume %s backup for snapshot %s", existVol.Name, csiSnapshotName)
existVol, err = cs.apiClient.Volume.ActionSnapshotBackup(existVol, &longhornclient.SnapshotInput{
Labels: csiLabels,
Name: csiSnapshotName,
})
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// Get the newly created backup. Don't wait for the backup controller to actually start the backup and update the
// status. It's possible that the backup operation can't actually be completed, but we need to return quickly so the
// CO can unfreeze I/O (if freezing is supported) and without error (if possible) so the CO knows our ID and can use
// it in future calls.
backup, err = cs.waitForBackupControllerSync(existVol.Name, csiSnapshotName)
if err != nil {
// Status code set in waitForBackupControllerSync.
return nil, err
}

log.Infof("Volume %s backup %s of snapshot %s created", existVol.Name, backup.Id, csiSnapshotName)
snapshotID := encodeSnapshotID(csiSnapshotTypeLonghornBackup, existVol.Name, backup.Id)
rsp := createSnapshotResponseForSnapshotTypeLonghornBackup(existVol.Name, snapshotID, snapshotCR.CreationTime,
existVol.Size, backup.State == string(longhorn.BackupStateCompleted))
return rsp, nil
}

Check notice on line 881 in csi/controller_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/controller_server.go#L782-L881

Complex Method
func createSnapshotResponseForSnapshotTypeLonghornSnapshot(sourceVolumeName, snapshotID string, snapshotCR *longhornclient.SnapshotCR) *csi.CreateSnapshotResponse {
creationTime, err := toProtoTimestamp(snapshotCR.CreationTime)
if err != nil {
Expand Down Expand Up @@ -1060,97 +1072,97 @@
return nil, status.Error(codes.Unimplemented, "")
}

func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
log := cs.log.WithFields(logrus.Fields{"function": "ControllerExpandVolume"})

log.Infof("ControllerExpandVolume is called with req %+v", req)

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}

if req.CapacityRange == nil {
return nil, status.Error(codes.InvalidArgument, "capacity range missing in request")
}
requestedSize := req.CapacityRange.GetRequiredBytes()

if req.VolumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "volume capacity missing in request")
}
isAccessModeMount := req.VolumeCapability.GetMount() != nil

existVol, err := cs.apiClient.Volume.ById(volumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}
if existVol == nil {
return nil, status.Errorf(codes.NotFound, "volume %s missing", volumeID)
}
if len(existVol.Controllers) != 1 {
return nil, status.Errorf(codes.InvalidArgument, "volume %s invalid controller count %v", volumeID, len(existVol.Controllers))
}
if existVol.State != string(longhorn.VolumeStateDetached) && existVol.State != string(longhorn.VolumeStateAttached) {
return nil, status.Errorf(codes.FailedPrecondition, "volume %s invalid state %v for controller volume expansion", volumeID, existVol.State)
}
existingSize, err := strconv.ParseInt(existVol.Size, 10, 64)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}

isOnlineExpansion := existVol.State == string(longhorn.VolumeStateAttached)

if existVol, err = cs.apiClient.Volume.ActionExpand(existVol, &longhornclient.ExpandInput{
Size: strconv.FormatInt(requestedSize, 10),
}); err != nil {
// TODO: This manual error code parsing should be refactored once Longhorn API implements error code response
// https://github.com/longhorn/longhorn/issues/1875
if matched, _ := regexp.MatchString("failed to schedule .* more bytes to disk", err.Error()); matched {
return nil, status.Errorf(codes.OutOfRange, "%v", err)
}
return nil, status.Errorf(codes.Internal, "%v", err)
}

// kubernetes doesn't support volume shrinking and the csi spec specifies to return true
// in the case where the current capacity is bigger or equal to the requested capacity
// that's why we return the volumeSize below instead of the requested capacity
volumeExpansionComplete := func(vol *longhornclient.Volume) bool {
engineReady := false
if len(vol.Controllers) > 0 {
engine := vol.Controllers[0]
engineSize, _ := strconv.ParseInt(engine.Size, 10, 64)
engineReady = engineSize >= requestedSize && !engine.IsExpanding
}
size, _ := strconv.ParseInt(vol.Size, 10, 64)
return size >= requestedSize && engineReady
}

// we wait for completion of the expansion, to ensure that longhorn and kubernetes state are in sync
// should this time out kubernetes will retry the expansion call since the call is idempotent
// we will exit early if the volume already has the requested size
if !cs.waitForVolumeState(volumeID, "volume expansion", volumeExpansionComplete, false, false) {
return nil, status.Errorf(codes.DeadlineExceeded, "volume %s expansion from existing capacity %v to requested capacity %v failed",
volumeID, existingSize, requestedSize)
}

volumeSize, err := strconv.ParseInt(existVol.Size, 10, 64)
if err != nil {
return nil, status.Errorf(codes.Internal, "%v", err)
}

if !isOnlineExpansion {
log.Info("Skip NodeExpandVolume since this is offline expansion, the filesystem resize will be handled by NodeStageVolume when there is a workload using the volume.")
}
if !isAccessModeMount {
log.Info("Skip NodeExpandVolume since the current volume is access mode block")
}

return &csi.ControllerExpandVolumeResponse{
CapacityBytes: volumeSize,
NodeExpansionRequired: isAccessModeMount && isOnlineExpansion,
}, nil
}

Check notice on line 1165 in csi/controller_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/controller_server.go#L1075-L1165

Complex Method
func (cs *ControllerServer) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
Expand Down
33 changes: 31 additions & 2 deletions csi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,26 @@

if isShared {
vol.AccessMode = string(longhorn.AccessModeReadWriteMany)
} else {
vol.AccessMode = string(longhorn.AccessModeReadWriteOnce)
}
}

if exclusive, ok := volOptions["exclusive"]; ok {
isExclusive, err := strconv.ParseBool(exclusive)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter exclusive")
}
if isExclusive && vol.AccessMode == string(longhorn.AccessModeReadWriteMany) {
return nil, errors.New("cannot set both share and exclusive to true")
}
if isExclusive {
vol.AccessMode = string(longhorn.AccessModeReadWriteOncePod)
}
}

if vol.AccessMode == "" {
vol.AccessMode = string(longhorn.AccessModeReadWriteOnce)
}

if migratable, ok := volOptions["migratable"]; ok {
isMigratable, err := strconv.ParseBool(migratable)
if err != nil {
Expand Down Expand Up @@ -436,7 +451,7 @@
// requiresSharedAccess checks if the volume is requested to be multi node capable
// a volume that is already in shared access mode, must be used via shared access
// even if single node access is requested.
func requiresSharedAccess(vol *longhornclient.Volume, cap *csi.VolumeCapability) bool {

Check notice on line 454 in csi/util.go

View check run for this annotation

codefactor.io / CodeFactor

csi/util.go#L454

Redefinition of the built-in function cap. (redefines-builtin-id)
isSharedVolume := false
if vol != nil {
isSharedVolume = vol.AccessMode == string(longhorn.AccessModeReadWriteMany) || vol.Migratable
Expand All @@ -453,6 +468,20 @@
mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}

func requireExclusiveAccess(vol *longhornclient.Volume, cap *csi.VolumeCapability) bool {

Check notice on line 471 in csi/util.go

View check run for this annotation

codefactor.io / CodeFactor

csi/util.go#L471

Redefinition of the built-in function cap. (redefines-builtin-id)
isExclusive := false
if vol != nil {
isExclusive = vol.AccessMode == string(longhorn.AccessModeReadWriteOncePod)
}

mode := csi.VolumeCapability_AccessMode_UNKNOWN
if cap != nil {
mode = cap.AccessMode.Mode
}

return isExclusive || mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
}
shikanime marked this conversation as resolved.
Show resolved Hide resolved

func getStageBlockVolumePath(stagingTargetPath, volumeID string) string {
return filepath.Join(stagingTargetPath, volumeID)
}
5 changes: 4 additions & 1 deletion datastore/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,11 @@ func NewPVManifest(size int64, pvName, volumeName, storageClassName, fsType stri
// NewPVCManifestForVolume returns a new PersistentVolumeClaim object for a longhorn volume
func NewPVCManifestForVolume(v *longhorn.Volume, pvName, ns, pvcName, storageClassName string) *corev1.PersistentVolumeClaim {
accessMode := corev1.ReadWriteOnce
if v.Spec.AccessMode == longhorn.AccessModeReadWriteMany {
switch v.Spec.AccessMode {
case longhorn.AccessModeReadWriteMany:
accessMode = corev1.ReadWriteMany
case longhorn.AccessModeReadWriteOncePod:
accessMode = corev1.ReadWriteOncePod
}

return NewPVCManifest(v.Spec.Size, pvName, ns, pvcName, storageClassName, accessMode)
Expand Down
5 changes: 3 additions & 2 deletions k8s/pkg/apis/longhorn/v1beta1/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ const (
type AccessMode string

const (
AccessModeReadWriteOnce = AccessMode("rwo")
AccessModeReadWriteMany = AccessMode("rwx")
AccessModeReadWriteOnce = AccessMode("rwo")
AccessModeReadWriteOncePod = AccessMode("rwop")
AccessModeReadWriteMany = AccessMode("rwx")
)

type ReplicaAutoBalance string
Expand Down
7 changes: 4 additions & 3 deletions k8s/pkg/apis/longhorn/v1beta2/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ const (
DataLocalityStrictLocal = DataLocality("strict-local")
)

// +kubebuilder:validation:Enum=rwo;rwx
// +kubebuilder:validation:Enum=rwo;rwop;rwx
type AccessMode string

const (
AccessModeReadWriteOnce = AccessMode("rwo")
AccessModeReadWriteMany = AccessMode("rwx")
AccessModeReadWriteOnce = AccessMode("rwo")
AccessModeReadWriteOncePod = AccessMode("rwop")
AccessModeReadWriteMany = AccessMode("rwx")
)

// +kubebuilder:validation:Enum=ignored;disabled;least-effort;best-effort
Expand Down
2 changes: 1 addition & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func ValidateDataLocality(mode longhorn.DataLocality) error {
}

func ValidateAccessMode(mode longhorn.AccessMode) error {
if mode != longhorn.AccessModeReadWriteMany && mode != longhorn.AccessModeReadWriteOnce {
if mode != longhorn.AccessModeReadWriteMany && mode != longhorn.AccessModeReadWriteOnce && mode != longhorn.AccessModeReadWriteOncePod {
return fmt.Errorf("invalid access mode: %v", mode)
}
return nil
Expand Down