From 122ebcc6db679ca844010c4e4cd4f1da509c508b Mon Sep 17 00:00:00 2001 From: Derek Su Date: Sun, 8 Dec 2024 16:06:27 +0800 Subject: [PATCH 1/2] chore(vendor): update dependencies Longhorn 9919 Signed-off-by: Derek Su --- go.mod | 2 +- go.sum | 4 +- .../go-spdk-helper/pkg/nvme/initiator.go | 343 ++++++++++-------- .../longhorn/go-spdk-helper/pkg/nvme/nvme.go | 17 +- .../go-spdk-helper/pkg/nvme/nvmecli.go | 10 +- .../pkg/spdk/client/advanced.go | 5 + .../go-spdk-helper/pkg/types/types.go | 14 + .../go-spdk-helper/pkg/util/device.go | 12 +- vendor/modules.txt | 2 +- 9 files changed, 236 insertions(+), 173 deletions(-) diff --git a/go.mod b/go.mod index cb8be1c8..2c76d5ba 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/longhorn/backupstore v0.0.0-20241208060255-5c474bb003bd github.com/longhorn/go-common-libs v0.0.0-20241208100509-e1932c65c078 - github.com/longhorn/go-spdk-helper v0.0.0-20241208060229-9425f07f800a + github.com/longhorn/go-spdk-helper v0.0.0-20241209014045-079275cb3845 github.com/longhorn/types v0.0.0-20241208031854-891e672bc453 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 04481694..26a71e8e 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/longhorn/backupstore v0.0.0-20241208060255-5c474bb003bd h1:lcqAEYf2+Z github.com/longhorn/backupstore v0.0.0-20241208060255-5c474bb003bd/go.mod h1:35urUcY2wO9lgFQnoR4l9i4Hc/x/fNzI8Pn/dloPSd0= github.com/longhorn/go-common-libs v0.0.0-20241208100509-e1932c65c078 h1:QnN9bPRhWr38ziOdknlC+NORk0PgQJcPX1vo20oVi9I= github.com/longhorn/go-common-libs v0.0.0-20241208100509-e1932c65c078/go.mod h1:whDcaYDin1L7uaKTpr86RxpfOT+VJQbubDWdEGPnvVs= -github.com/longhorn/go-spdk-helper v0.0.0-20241208060229-9425f07f800a h1:o8ycbukiUGbBPNGnGbU3H+YSbjNsdk5KgJHoiWkpjmA= -github.com/longhorn/go-spdk-helper v0.0.0-20241208060229-9425f07f800a/go.mod h1:JoJT6We5ctsc/I4t/ucRLqqlTgyXAJIVTl0P4KW33do= +github.com/longhorn/go-spdk-helper v0.0.0-20241209014045-079275cb3845 h1:UpwS/9KZN34T2C9a8vYaJPPNWqdWOTkDBceMSipsq8M= +github.com/longhorn/go-spdk-helper v0.0.0-20241209014045-079275cb3845/go.mod h1:isAM1U36SWOh7XWfktlbveHWSLXV3HfEF7p/tyNqAUQ= github.com/longhorn/types v0.0.0-20241208031854-891e672bc453 h1:NdPh1ARYoBnFqCn3sRNJXa+WeJZcS+J0wXpJBou26KM= github.com/longhorn/types v0.0.0-20241208031854-891e672bc453/go.mod h1:ZElOIs7s/Cjaw7P9kY+uvTzh87mfO34pk39B6TVmg0g= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go index e9fdac98..8845f2d8 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go @@ -3,6 +3,7 @@ package nvme import ( "fmt" "os" + "path/filepath" "strconv" "strings" "time" @@ -21,17 +22,19 @@ const ( LockFile = "/var/run/longhorn-spdk.lock" LockTimeout = 120 * time.Second - maxNumRetries = 15 - retryInterval = 1 * time.Second - - maxNumWaitDeviceRetries = 60 - waitDeviceInterval = 1 * time.Second - HostProc = "/host/proc" validateDiskCreationTimeout = 30 // seconds ) +const ( + maxConnectTargetRetries = 15 + retryConnectTargetInterval = 1 * time.Second + + maxWaitDeviceRetries = 60 + waitDeviceInterval = 1 * time.Second +) + type Initiator struct { Name string SubsystemNQN string @@ -51,10 +54,14 @@ type Initiator struct { logger logrus.FieldLogger } -// NewInitiator creates a new NVMe initiator +// NewInitiator creates a new NVMe-oF initiator func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) { - if name == "" || subsystemNQN == "" { - return nil, fmt.Errorf("empty name or subsystem for initiator creation") + if name == "" { + return nil, fmt.Errorf("empty name for NVMe-oF initiator creation") + } + + if subsystemNQN == "" { + return nil, fmt.Errorf("empty subsystem for NVMe-oF initiator creation") } // If transportAddress or transportServiceID is empty, the initiator is still valid for stopping @@ -131,8 +138,8 @@ func (i *Initiator) DisconnectTarget() error { return DisconnectTarget(i.SubsystemNQN, i.executor) } -// WaitForConnect waits for the NVMe initiator to connect -func (i *Initiator) WaitForConnect(maxNumRetries int, retryInterval time.Duration) (err error) { +// WaitForConnect waits for the NVMe-oF initiator to connect +func (i *Initiator) WaitForConnect(maxRetries int, retryInterval time.Duration) (err error) { if i.hostProc != "" { lock, err := i.newLock() if err != nil { @@ -141,7 +148,7 @@ func (i *Initiator) WaitForConnect(maxNumRetries int, retryInterval time.Duratio defer lock.Unlock() } - for r := 0; r < maxNumRetries; r++ { + for r := 0; r < maxRetries; r++ { err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) if err == nil { return nil @@ -152,8 +159,8 @@ func (i *Initiator) WaitForConnect(maxNumRetries int, retryInterval time.Duratio return err } -// WaitForDisconnect waits for the NVMe initiator to disconnect -func (i *Initiator) WaitForDisconnect(maxNumRetries int, retryInterval time.Duration) (err error) { +// WaitForDisconnect waits for the NVMe-oF initiator to disconnect +func (i *Initiator) WaitForDisconnect(maxRetries int, retryInterval time.Duration) (err error) { if i.hostProc != "" { lock, err := i.newLock() if err != nil { @@ -162,9 +169,9 @@ func (i *Initiator) WaitForDisconnect(maxNumRetries int, retryInterval time.Dura defer lock.Unlock() } - for r := 0; r < maxNumRetries; r++ { + for r := 0; r < maxRetries; r++ { err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) - if IsValidNvmeDeviceNotFound(err) { + if types.ErrorIsValidNvmeDeviceNotFound(err) { return nil } time.Sleep(retryInterval) @@ -173,7 +180,7 @@ func (i *Initiator) WaitForDisconnect(maxNumRetries int, retryInterval time.Dura return err } -// Suspend suspends the device mapper device for the NVMe initiator +// Suspend suspends the device mapper device for the NVMe-oF initiator func (i *Initiator) Suspend(noflush, nolockfs bool) error { if i.hostProc != "" { lock, err := i.newLock() @@ -185,19 +192,19 @@ func (i *Initiator) Suspend(noflush, nolockfs bool) error { suspended, err := i.IsSuspended() if err != nil { - return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe-oF initiator %s", i.Name) } if !suspended { if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil { - return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to suspend linear dm device for NVMe-oF initiator %s", i.Name) } } return nil } -// Resume resumes the device mapper device for the NVMe initiator +// Resume resumes the device mapper device for the NVMe-oF initiator func (i *Initiator) Resume() error { if i.hostProc != "" { lock, err := i.newLock() @@ -208,14 +215,14 @@ func (i *Initiator) Resume() error { } if err := i.resumeLinearDmDevice(); err != nil { - return errors.Wrapf(err, "failed to resume device mapper device for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to resume linear dm device for NVMe-oF initiator %s", i.Name) } return nil } func (i *Initiator) resumeLinearDmDevice() error { - logrus.Infof("Resuming linear dm device %s", i.Name) + i.logger.Info("Resuming linear dm device") return util.DmsetupResume(i.Name, i.executor) } @@ -223,44 +230,42 @@ func (i *Initiator) resumeLinearDmDevice() error { func (i *Initiator) replaceDmDeviceTarget() error { suspended, err := i.IsSuspended() if err != nil { - return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe-oF initiator %s", i.Name) } if !suspended { if err := i.suspendLinearDmDevice(true, false); err != nil { - return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to suspend linear dm device for NVMe-oF initiator %s", i.Name) } } if err := i.reloadLinearDmDevice(); err != nil { - return errors.Wrapf(err, "failed to reload linear dm device for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to reload linear dm device for NVMe-oF initiator %s", i.Name) } if err := i.resumeLinearDmDevice(); err != nil { - return errors.Wrapf(err, "failed to resume linear dm device for NVMe initiator %s", i.Name) + return errors.Wrapf(err, "failed to resume linear dm device for NVMe-oF initiator %s", i.Name) } return nil } -// Start starts the NVMe initiator with the given transportAddress and transportServiceID -func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceAndEndpointCleanupRequired bool) (dmDeviceBusy bool, err error) { +// Start starts the NVMe-oF initiator with the given transportAddress and transportServiceID +func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceAndEndpointCleanupRequired bool) (dmDeviceIsBusy bool, err error) { defer func() { if err != nil { - err = errors.Wrapf(err, "failed to start NVMe initiator %s", i.Name) + err = errors.Wrapf(err, "failed to start NVMe-oF initiator %s", i.Name) } }() + if transportAddress == "" || transportServiceID == "" { + return false, fmt.Errorf("invalid transportAddress %s and transportServiceID %s for starting initiator %s", transportAddress, transportServiceID, i.Name) + } + i.logger.WithFields(logrus.Fields{ "transportAddress": transportAddress, "transportServiceID": transportServiceID, "dmDeviceAndEndpointCleanupRequired": dmDeviceAndEndpointCleanupRequired, - }) - - i.logger.Info("Starting initiator") - - if transportAddress == "" || transportServiceID == "" { - return false, fmt.Errorf("invalid TransportAddress %s and TransportServiceID %s for initiator %s start", transportAddress, transportServiceID, i.Name) - } + }).Info("Starting NVMe-oF initiator") if i.hostProc != "" { lock, err := i.newLock() @@ -270,105 +275,114 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceA defer lock.Unlock() } - // Check if the initiator/NVMe device is already launched and matches the params - if err := i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN); err == nil { + // Check if the initiator/NVMe-oF device is already launched and matches the params + err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) + if err == nil { if i.TransportAddress == transportAddress && i.TransportServiceID == transportServiceID { - if err = i.LoadEndpoint(false); err == nil { - i.logger.Info("NVMe initiator is already launched with correct params") + err = i.LoadEndpoint(false) + if err == nil { + i.logger.Info("NVMe-oF initiator is already launched with correct params") return false, nil } - i.logger.WithError(err).Warnf("NVMe initiator is launched with failed to load the endpoint") + i.logger.WithError(err).Warnf("NVMe-oF initiator is launched with failed to load the endpoint") } else { - i.logger.Warnf("NVMe initiator is launched but with incorrect address, the required one is %s:%s, will try to stop then relaunch it", - transportAddress, transportServiceID) + i.logger.Warnf("NVMe-oF initiator is launched but with incorrect address, the required one is %s:%s, will try to stop then relaunch it", transportAddress, transportServiceID) } } - i.logger.Infof("Stopping NVMe initiator blindly before starting") - dmDeviceBusy, err = i.stopWithoutLock(dmDeviceAndEndpointCleanupRequired, false, false) + i.logger.Info("Stopping NVMe-oF initiator blindly before starting") + dmDeviceIsBusy, err = i.stopWithoutLock(dmDeviceAndEndpointCleanupRequired, false, false) if err != nil { - return dmDeviceBusy, errors.Wrapf(err, "failed to stop the mismatching NVMe initiator %s before starting", i.Name) + return dmDeviceIsBusy, errors.Wrapf(err, "failed to stop the mismatching NVMe-oF initiator %s before starting", i.Name) } - i.logger.WithFields(logrus.Fields{ - "transportAddress": transportAddress, - "transportServiceID": transportServiceID, - }) - i.logger.Infof("Launching NVMe initiator") - - // Setup initiator - for r := 0; r < maxNumRetries; r++ { - // Rerun this API for a discovered target should be fine - subsystemNQN, err := DiscoverTarget(transportAddress, transportServiceID, i.executor) - if err != nil { - i.logger.WithError(err).Warn("Failed to discover target") - time.Sleep(retryInterval) - continue - } - - controllerName, err := ConnectTarget(transportAddress, transportServiceID, subsystemNQN, i.executor) - if err != nil { - i.logger.WithError(err).Warn("Failed to connect target") - time.Sleep(retryInterval) - continue - } - - i.SubsystemNQN = subsystemNQN - i.ControllerName = controllerName - break - } + i.logger.Info("Launching NVMe-oF initiator") + i.connectTarget(transportAddress, transportServiceID, maxConnectTargetRetries, retryConnectTargetInterval) if i.ControllerName == "" { - return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, maxNumRetries, retryInterval.Seconds()) + return dmDeviceIsBusy, fmt.Errorf("failed to start NVMe-oF initiator %s within %d * %v sec retries", i.Name, maxConnectTargetRetries, retryConnectTargetInterval.Seconds()) } - for r := 0; r < maxNumWaitDeviceRetries; r++ { - err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) - if err == nil { - break - } - time.Sleep(waitDeviceInterval) - } + err = i.waitAndLoadNVMeDeviceInfoWithoutLock(transportAddress, transportServiceID) if err != nil { - return dmDeviceBusy, errors.Wrapf(err, "failed to load device info after starting NVMe initiator %s", i.Name) + return dmDeviceIsBusy, errors.Wrapf(err, "failed to load device info after connecting target for NVMe-oF initiator %s", i.Name) } - needMakeEndpoint := true if dmDeviceAndEndpointCleanupRequired { - if dmDeviceBusy { + if dmDeviceIsBusy { // Endpoint is already created, just replace the target device - needMakeEndpoint = false - i.logger.Infof("Linear dm device is busy, trying the best to replace the target device for NVMe initiator %s", i.Name) + i.logger.Info("Linear dm device is busy, trying the best to replace the target device for NVMe-oF initiator") if err := i.replaceDmDeviceTarget(); err != nil { - i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe initiator %s", i.Name) + i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe-oF initiator") } else { - i.logger.Infof("Successfully replaced the target device for NVMe initiator %s", i.Name) - dmDeviceBusy = false + i.logger.Info("Successfully replaced the target device for NVMe-oF initiator") + dmDeviceIsBusy = false } } else { - i.logger.Infof("Creating linear dm device for NVMe initiator %s", i.Name) + i.logger.Info("Creating linear dm device for NVMe-oF initiator") if err := i.createLinearDmDevice(); err != nil { - return false, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name) + return false, errors.Wrapf(err, "failed to create linear dm device for NVMe-oF initiator %s", i.Name) } } } else { - i.logger.Infof("Skipping creating linear dm device for NVMe initiator %s", i.Name) + i.logger.Info("Skipping creating linear dm device for NVMe-oF initiator") i.dev.Export = i.dev.Nvme } - if needMakeEndpoint { - i.logger.Infof("Creating endpoint %v", i.Endpoint) + i.logger.Infof("Creating endpoint %v", i.Endpoint) + exist, err := i.isEndpointExist() + if err != nil { + return dmDeviceIsBusy, errors.Wrapf(err, "failed to check if endpoint %v exists for NVMe-oF initiator %s", i.Endpoint, i.Name) + } + if exist { + i.logger.Infof("Skipping endpoint %v creation for NVMe-oF initiator", i.Endpoint) + } else { if err := i.makeEndpoint(); err != nil { - return dmDeviceBusy, err + return dmDeviceIsBusy, err } } - i.logger.Infof("Launched NVMe initiator: %+v", i) + i.logger.Infof("Launched NVMe-oF initiator: %+v", i) - return dmDeviceBusy, nil + return dmDeviceIsBusy, nil } -func (i *Initiator) Stop(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { +func (i *Initiator) waitAndLoadNVMeDeviceInfoWithoutLock(transportAddress, transportServiceID string) (err error) { + for r := 0; r < maxWaitDeviceRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock(transportAddress, transportServiceID, i.SubsystemNQN) + if err == nil { + break + } + time.Sleep(waitDeviceInterval) + } + return err +} + +func (i *Initiator) connectTarget(transportAddress, transportServiceID string, maxRetries int, retryInterval time.Duration) { + for r := 0; r < maxRetries; r++ { + // Rerun this API for a discovered target should be fine + subsystemNQN, err := DiscoverTarget(transportAddress, transportServiceID, i.executor) + if err != nil { + i.logger.WithError(err).Warn("Failed to discover target") + time.Sleep(retryInterval) + continue + } + + controllerName, err := ConnectTarget(transportAddress, transportServiceID, subsystemNQN, i.executor) + if err != nil { + i.logger.WithError(err).Warn("Failed to connect target") + time.Sleep(retryInterval) + continue + } + + i.SubsystemNQN = subsystemNQN + i.ControllerName = controllerName + break + } +} + +// Stop stops the NVMe-oF initiator +func (i *Initiator) Stop(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanup, returnErrorForBusyDevice bool) (bool, error) { if i.hostProc != "" { lock, err := i.newLock() if err != nil { @@ -377,37 +391,36 @@ func (i *Initiator) Stop(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanu defer lock.Unlock() } - return i.stopWithoutLock(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice) + return i.stopWithoutLock(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanup, returnErrorForBusyDevice) } -func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { - if err := i.removeLinearDmDevice(false, deferDmDeviceCleanup); err != nil { - if strings.Contains(err.Error(), "Device or resource busy") { - if errOnBusyDmDevice { - return true, err +func (i *Initiator) stopWithoutLock(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanup, returnErrorForBusyDevice bool) (dmDeviceIsBusy bool, err error) { + dmDeviceIsBusy = false + + if dmDeviceAndEndpointCleanupRequired { + err = i.removeLinearDmDevice(false, deferDmDeviceCleanup) + if err != nil { + if !os.IsNotExist(err) { + if types.ErrorIsDeviceOrResourceBusy(err) { + if returnErrorForBusyDevice { + return true, err + } + dmDeviceIsBusy = true + } else { + return false, err + } } - return true, nil } - return false, err - } - if err := i.removeEndpoint(); err != nil { - return false, err - } - return false, nil -} -func (i *Initiator) stopWithoutLock(dmDeviceAndEndpointCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { - dmDeviceBusy := false - if dmDeviceAndEndpointCleanupRequired { - var err error - dmDeviceBusy, err = i.removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice) + err = i.removeEndpoint() if err != nil { return false, err } } - if err := DisconnectTarget(i.SubsystemNQN, i.executor); err != nil { - return dmDeviceBusy, errors.Wrapf(err, "failed to logout target") + err = DisconnectTarget(i.SubsystemNQN, i.executor) + if err != nil { + return dmDeviceIsBusy, errors.Wrapf(err, "failed to disconnect target for NVMe-oF initiator %s", i.Name) } i.ControllerName = "" @@ -415,25 +428,30 @@ func (i *Initiator) stopWithoutLock(dmDeviceAndEndpointCleanupRequired, deferDmD i.TransportAddress = "" i.TransportServiceID = "" - return dmDeviceBusy, nil + return dmDeviceIsBusy, nil } +// GetControllerName returns the controller name func (i *Initiator) GetControllerName() string { return i.ControllerName } +// GetNamespaceName returns the namespace name func (i *Initiator) GetNamespaceName() string { return i.NamespaceName } +// GetTransportAddress returns the transport address func (i *Initiator) GetTransportAddress() string { return i.TransportAddress } +// GetTransportServiceID returns the transport service ID func (i *Initiator) GetTransportServiceID() string { return i.TransportServiceID } +// GetEndpoint returns the endpoint func (i *Initiator) GetEndpoint() string { if i.isUp { return i.Endpoint @@ -441,6 +459,7 @@ func (i *Initiator) GetEndpoint() string { return "" } +// GetDevice returns the device information func (i *Initiator) LoadNVMeDeviceInfo(transportAddress, transportServiceID, subsystemNQN string) (err error) { if i.hostProc != "" { lock, err := i.newLock() @@ -459,28 +478,29 @@ func (i *Initiator) loadNVMeDeviceInfoWithoutLock(transportAddress, transportSer return err } if len(nvmeDevices) != 1 { - return fmt.Errorf("found zero or multiple devices NVMe initiator %s", i.Name) + return fmt.Errorf("found zero or multiple devices NVMe-oF initiator %s", i.Name) } if len(nvmeDevices[0].Namespaces) != 1 { - return fmt.Errorf("found zero or multiple devices for NVMe initiator %s", i.Name) + return fmt.Errorf("found zero or multiple devices for NVMe-oF initiator %s", i.Name) } if i.ControllerName != "" && i.ControllerName != nvmeDevices[0].Controllers[0].Controller { - return fmt.Errorf("found mismatching between the detected controller name %s and the recorded value %s for NVMe initiator %s", nvmeDevices[0].Controllers[0].Controller, i.ControllerName, i.Name) + return fmt.Errorf("found mismatching between the detected controller name %s and the recorded value %s for NVMe-oF initiator %s", nvmeDevices[0].Controllers[0].Controller, i.ControllerName, i.Name) } + i.ControllerName = nvmeDevices[0].Controllers[0].Controller i.NamespaceName = nvmeDevices[0].Namespaces[0].NameSpace i.TransportAddress, i.TransportServiceID = GetIPAndPortFromControllerAddress(nvmeDevices[0].Controllers[0].Address) - i.logger.WithFields(logrus.Fields{ + i.logger = i.logger.WithFields(logrus.Fields{ "controllerName": i.ControllerName, "namespaceName": i.NamespaceName, "transportAddress": i.TransportAddress, "transportServiceID": i.TransportServiceID, }) - devicePath := fmt.Sprintf("/dev/%s", i.NamespaceName) - dev, err := util.DetectDevice(devicePath, i.executor) + devPath := filepath.Join("/dev", i.NamespaceName) + dev, err := util.DetectDevice(devPath, i.executor) if err != nil { - return errors.Wrapf(err, "cannot find the device for NVMe initiator %s with namespace name %s", i.Name, i.NamespaceName) + return errors.Wrapf(err, "cannot find the device for NVMe-oF initiator %s with namespace name %s", i.Name, i.NamespaceName) } i.dev = &util.LonghornBlockDevice{ @@ -506,7 +526,8 @@ func (i *Initiator) findDependentDevices(devName string) ([]string, error) { return depDevices, nil } -func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error { +// LoadEndpoint loads the endpoint +func (i *Initiator) LoadEndpoint(dmDeviceIsBusy bool) error { dev, err := util.DetectDevice(i.Endpoint, i.executor) if err != nil { return err @@ -517,11 +538,11 @@ func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error { return err } - if dmDeviceBusy { - i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %v due to device busy", i.Endpoint, i.Name) + if dmDeviceIsBusy { + i.logger.Debugf("Skipping endpoint %v loading due to device busy", i.Endpoint) } else { if i.NamespaceName != "" && !i.isNamespaceExist(depDevices) { - return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Name, i.Endpoint, i.Name) + return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe-oF initiator %s", dev.Name, i.Endpoint, i.Name) } } @@ -533,6 +554,17 @@ func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error { return nil } +func (i *Initiator) isEndpointExist() (bool, error) { + _, err := os.Stat(i.Endpoint) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return true, nil +} + func (i *Initiator) makeEndpoint() error { if err := util.DuplicateDevice(i.dev, i.Endpoint); err != nil { return errors.Wrap(err, "failed to duplicate device") @@ -542,26 +574,25 @@ func (i *Initiator) makeEndpoint() error { } func (i *Initiator) removeEndpoint() error { + if i.Endpoint == "" { + return nil + } + if err := util.RemoveDevice(i.Endpoint); err != nil { return err } i.dev = nil i.isUp = false - return nil } func (i *Initiator) removeLinearDmDevice(force, deferred bool) error { - devPath := getDmDevicePath(i.Name) - if _, err := os.Stat(devPath); err != nil { - if os.IsNotExist(err) { - logrus.Infof("Linear dm device %s doesn't exist", devPath) - return nil - } - return errors.Wrapf(err, "failed to stat linear dm device %s", devPath) + dmDevPath := getDmDevicePath(i.Name) + if _, err := os.Stat(dmDevPath); err != nil { + return err } - logrus.Infof("Removing linear dm device %s", i.Name) + i.logger.Info("Removing linear dm device") return util.DmsetupRemove(i.Name, force, deferred, i.executor) } @@ -578,7 +609,8 @@ func (i *Initiator) createLinearDmDevice() error { // Create a device mapper device with the same size as the original device table := fmt.Sprintf("0 %v linear %v 0", sectors, nvmeDevPath) - logrus.Infof("Creating linear dm device %s with table %s", i.Name, table) + + i.logger.Infof("Creating linear dm device with table '%s'", table) if err := util.DmsetupCreate(i.Name, table, i.executor); err != nil { return err } @@ -588,6 +620,7 @@ func (i *Initiator) createLinearDmDevice() error { return err } + // Get the device numbers major, minor, err := util.GetDeviceNumbers(dmDevPath, i.executor) if err != nil { return err @@ -613,7 +646,7 @@ func validateDiskCreation(path string, timeout int) error { } func (i *Initiator) suspendLinearDmDevice(noflush, nolockfs bool) error { - logrus.Infof("Suspending linear dm device %s", i.Name) + i.logger.Info("Suspending linear dm device") return util.DmsetupSuspend(i.Name, noflush, nolockfs, i.executor) } @@ -664,15 +697,27 @@ func (i *Initiator) reloadLinearDmDevice() error { table := fmt.Sprintf("0 %v linear %v 0", sectors, devPath) - logrus.Infof("Reloading linear dm device %s with table '%s'", i.Name, table) + i.logger.Infof("Reloading linear dm device with table '%s'", table) - return util.DmsetupReload(i.Name, table, i.executor) -} + err = util.DmsetupReload(i.Name, table, i.executor) + if err != nil { + return err + } -func getDmDevicePath(name string) string { - return fmt.Sprintf("/dev/mapper/%s", name) + // Reload the device numbers + dmDevPath := getDmDevicePath(i.Name) + major, minor, err := util.GetDeviceNumbers(dmDevPath, i.executor) + if err != nil { + return err + } + + i.dev.Export.Name = i.Name + i.dev.Export.Major = major + i.dev.Export.Minor = minor + + return nil } -func IsValidNvmeDeviceNotFound(err error) bool { - return strings.Contains(err.Error(), ErrorMessageCannotFindValidNvmeDevice) +func getDmDevicePath(name string) string { + return filepath.Join("/dev/mapper", name) } diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go index 75e26da9..764ba670 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go @@ -8,10 +8,8 @@ import ( "github.com/sirupsen/logrus" commonns "github.com/longhorn/go-common-libs/ns" -) -const ( - ErrorMessageCannotFindValidNvmeDevice = "cannot find a valid nvme device" + "github.com/longhorn/go-spdk-helper/pkg/types" ) // DiscoverTarget discovers a target @@ -59,7 +57,7 @@ func ConnectTarget(ip, port, nqn string, executor *commonns.Executor) (controlle return connect(hostID, hostNQN, nqn, DefaultTransportType, ip, port, executor) } -// DisconnectTarget disconnects a target +// DisconnectTarget disconnects from a target func DisconnectTarget(nqn string, executor *commonns.Executor) error { return disconnect(nqn, executor) } @@ -72,22 +70,21 @@ func GetDevices(ip, port, nqn string, executor *commonns.Executor) (devices []De devices = []Device{} - nvmeDevices, err := listControllers(executor) + nvmeDevices, err := listRecognizedNvmeDevices(executor) if err != nil { return nil, err } for _, d := range nvmeDevices { - // Get subsystem subsystems, err := listSubsystems(d.DevicePath, executor) if err != nil { - logrus.WithError(err).Warnf("failed to get subsystem for nvme device %s", d.DevicePath) + logrus.WithError(err).Warnf("failed to list subsystems for NVMe device %s", d.DevicePath) continue } if len(subsystems) == 0 { - return nil, fmt.Errorf("no subsystem found for nvme device %s", d.DevicePath) + return nil, fmt.Errorf("no subsystem found for NVMe device %s", d.DevicePath) } if len(subsystems) > 1 { - return nil, fmt.Errorf("multiple subsystems found for nvme device %s", d.DevicePath) + return nil, fmt.Errorf("multiple subsystems found for NVMe device %s", d.DevicePath) } sys := subsystems[0] @@ -166,7 +163,7 @@ func GetDevices(ip, port, nqn string, executor *commonns.Executor) (devices []De } } - return nil, fmt.Errorf(ErrorMessageCannotFindValidNvmeDevice+" with subsystem NQN %s and address %s:%s", nqn, ip, port) + return nil, fmt.Errorf(types.ErrorMessageCannotFindValidNvmeDevice+" with subsystem NQN %s and address %s:%s", nqn, ip, port) } return res, nil } diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go index adc61996..b656d7d8 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go @@ -149,12 +149,12 @@ func listSubsystems(devicePath string, executor *commonns.Executor) ([]Subsystem } if major == 1 { - return listSubsystemsV1(jsonStr, executor) + return listSubsystemsV1(jsonStr) } - return listSubsystemsV2(jsonStr, executor) + return listSubsystemsV2(jsonStr) } -func listSubsystemsV1(jsonStr string, executor *commonns.Executor) ([]Subsystem, error) { +func listSubsystemsV1(jsonStr string) ([]Subsystem, error) { output := map[string][]Subsystem{} if err := json.Unmarshal([]byte(jsonStr), &output); err != nil { return nil, err @@ -169,7 +169,7 @@ type ListSubsystemsV2Output struct { Subsystems []Subsystem `json:"Subsystems"` } -func listSubsystemsV2(jsonStr string, executor *commonns.Executor) ([]Subsystem, error) { +func listSubsystemsV2(jsonStr string) ([]Subsystem, error) { var output []ListSubsystemsV2Output if err := json.Unmarshal([]byte(jsonStr), &output); err != nil { return nil, err @@ -197,7 +197,7 @@ type CliDevice struct { SectorSize int32 `json:"SectorSize,omitempty"` } -func listControllers(executor *commonns.Executor) ([]CliDevice, error) { +func listRecognizedNvmeDevices(executor *commonns.Executor) ([]CliDevice, error) { opts := []string{ "list", "-o", "json", diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/spdk/client/advanced.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/spdk/client/advanced.go index 5fefbe40..3c4fce1b 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/spdk/client/advanced.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/spdk/client/advanced.go @@ -8,6 +8,7 @@ import ( spdktypes "github.com/longhorn/go-spdk-helper/pkg/spdk/types" ) +// AddDevice adds a device with the given device path, name, and cluster size. func (c *Client) AddDevice(devicePath, name string, clusterSize uint32) (bdevAioName, lvsName, lvsUUID string, err error) { // Use the file name as aio name and lvs name if name is not specified. if name == "" { @@ -39,6 +40,7 @@ func (c *Client) AddDevice(devicePath, name string, clusterSize uint32) (bdevAio return name, name, lvsUUID, nil } +// DeleteDevice deletes the device with the given bdevAioName and lvsName. func (c *Client) DeleteDevice(bdevAioName, lvsName string) (err error) { if _, err := c.BdevLvolDeleteLvstore(lvsName, ""); err != nil { return err @@ -51,6 +53,7 @@ func (c *Client) DeleteDevice(bdevAioName, lvsName string) (err error) { return nil } +// StartExposeBdev exposes the bdev with the given nqn, bdevName, nguid, ip, and port. func (c *Client) StartExposeBdev(nqn, bdevName, nguid, ip, port string) error { nvmfTransportList, err := c.NvmfGetTransports("", "") if err != nil { @@ -77,8 +80,10 @@ func (c *Client) StartExposeBdev(nqn, bdevName, nguid, ip, port string) error { return nil } +// StopExposeBdev stops exposing the bdev with the given nqn. func (c *Client) StopExposeBdev(nqn string) error { var subsystem *spdktypes.NvmfSubsystem + subsystemList, err := c.NvmfGetSubsystems("", "") if err != nil { return err diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go index 12481635..74a2d0c8 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "strings" "time" ) @@ -25,6 +26,11 @@ const ( ExecuteTimeout = 60 * time.Second ) +const ( + ErrorMessageCannotFindValidNvmeDevice = "cannot find a valid NVMe device" + ErrorMessageDeviceOrResourceBusy = "device or resource busy" +) + const ( // Sequence of Events: // 1. Issuing I/O Command: The system sends a read command to the NVMe SSD. @@ -69,3 +75,11 @@ type DiskStatus struct { Device string BlockDevices string } + +func ErrorIsDeviceOrResourceBusy(err error) bool { + return strings.Contains(strings.ToLower(err.Error()), ErrorMessageDeviceOrResourceBusy) +} + +func ErrorIsValidNvmeDeviceNotFound(err error) bool { + return strings.Contains(err.Error(), ErrorMessageCannotFindValidNvmeDevice) +} diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go index 32009875..2610cc04 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go @@ -41,13 +41,15 @@ type LonghornBlockDevice struct { } // RemoveDevice removes the given device -func RemoveDevice(dev string) error { - if _, err := os.Stat(dev); err == nil { - if err := remove(dev); err != nil { - return errors.Wrapf(err, "failed to removing device %s", dev) +func RemoveDevice(devPath string) error { + if _, err := os.Stat(devPath); err != nil { + if os.IsNotExist(err) { + return nil } } - return nil + + // Still try to remove the device node + return remove(devPath) } // GetKnownDevices returns the path of the device with the given major and minor numbers diff --git a/vendor/modules.txt b/vendor/modules.txt index be508518..7e1b600d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -64,7 +64,7 @@ github.com/longhorn/go-common-libs/sync github.com/longhorn/go-common-libs/sys github.com/longhorn/go-common-libs/types github.com/longhorn/go-common-libs/utils -# github.com/longhorn/go-spdk-helper v0.0.0-20241208060229-9425f07f800a +# github.com/longhorn/go-spdk-helper v0.0.0-20241209014045-079275cb3845 ## explicit; go 1.22.7 github.com/longhorn/go-spdk-helper/pkg/jsonrpc github.com/longhorn/go-spdk-helper/pkg/nvme From fbe02ea0e80cc6dc672abb9bdf59d67571844896 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Mon, 9 Dec 2024 12:55:52 +0800 Subject: [PATCH 2/2] refactor: mainly refactor engine codes Longhorn 9919 Signed-off-by: Derek Su --- pkg/client/client.go | 3 +- pkg/spdk/engine.go | 524 ++++++++++++++++++++++++++----------------- pkg/spdk/replica.go | 8 +- pkg/spdk/server.go | 2 +- pkg/spdk_test.go | 86 +------ 5 files changed, 327 insertions(+), 296 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 110069f8..8662fad1 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -474,7 +474,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,7 +490,6 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui ReplicaAddressMap: replicaAddressMap, Frontend: frontend, PortCount: portCount, - UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, SalvageRequested: salvageRequested, diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 5a11e76d..65a9119f 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.uber.org/multierr" grpccodes "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -41,9 +42,9 @@ type Engine struct { SpecSize uint64 ActualSize uint64 IP string - Port int32 + Port int32 // Port that initiator is connecting to TargetIP string - TargetPort int32 + TargetPort int32 // Port of the target that is used for letting initiator connect to Frontend string Endpoint string Nqn string @@ -54,8 +55,8 @@ type Engine struct { ReplicaStatusMap map[string]*EngineReplicaStatus - initiator *nvme.Initiator - dmDeviceBusy bool + initiator *nvme.Initiator + dmDeviceIsBusy bool State types.InstanceState ErrorMsg string @@ -113,10 +114,46 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) isNewEngine() bool { + return e.IP == "" && e.TargetIP == "" +} + +func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { + initiatorCreationRequired, targetCreationRequired := false, false + var err error + + if podIP == initiatorIP && podIP == targetIP { + // If the engine is running on the same pod, it should have both initiator and target instances + if e.Port == 0 && e.TargetPort == 0 { + // Both initiator and target instances are not created yet + e.log.Info("Creating both initiator and target instances") + initiatorCreationRequired = true + targetCreationRequired = true + } else if e.Port != 0 && e.TargetPort == 0 { + // Only target instance creation is required, because the initiator instance is already running + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort) + } + } else if podIP == initiatorIP && podIP != targetIP { + // Only initiator instance creation is required, because the target instance is running on a different pod + e.log.Info("Creating an initiator instance") + initiatorCreationRequired = true + } else if podIP == targetIP && podIP != initiatorIP { + // Only target instance creation is required, because the initiator instance is running on a different pod + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + err = fmt.Errorf("invalid initiator and target addresses for engine %s creation with initiator address %v and target address %v", e.Name, initiatorIP, targetIP) + } + + return initiatorCreationRequired, targetCreationRequired, err +} + +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, - "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, @@ -128,7 +165,6 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.Lock() defer func() { e.Unlock() - if requireUpdate { e.UpdateCh <- nil } @@ -143,6 +179,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str if err != nil { return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } + targetIP, _, err := splitHostPort(targetAddress) if err != nil { return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) @@ -177,29 +214,30 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str replicaBdevList := []string{} - initiatorCreationRequired := true - if !upgradeRequired { - if e.IP == "" { - if initiatorIP != targetIP { - // For creating target on another node - initiatorCreationRequired = false - e.log.Info("Creating an target engine") - e.TargetIP = podIP - } else { - // For newly creating engine - e.log.Info("Creating an new engine") - e.IP = podIP - e.TargetIP = podIP - } + initiatorCreationRequired, targetCreationRequired, err := e.checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP) + if err != nil { + return nil, err + } + if !initiatorCreationRequired && !targetCreationRequired { + return e.getWithoutLock(), nil + } - e.log = e.log.WithField("ip", e.IP) - } else { - if initiatorIP != targetIP { - return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) - } + if e.isNewEngine() { + if initiatorCreationRequired { + e.IP = initiatorIP + } + e.TargetIP = targetIP + } - // For creating target on attached node - initiatorCreationRequired = false + e.log = e.log.WithFields(logrus.Fields{ + "initiatorIP": e.IP, + "targetIP": e.TargetIP, + }) + + if targetCreationRequired { + _, err := spdkClient.BdevRaidGet(e.Name, 0) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to get raid bdev %v during engine creation", e.Name) } if salvageRequested { @@ -231,18 +269,14 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) + e.log.Infof("Connecting all available replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } } else { - // For reconstructing engine after switching over target to another node - initiatorCreationRequired = false - - e.IP = targetIP + e.log.Info("Skipping target creation during engine creation") - // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKServiceAddress := net.JoinHostPort(e.TargetIP, strconv.Itoa(types.SPDKServicePort)) targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err @@ -253,21 +287,17 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - var engineWithTarget *api.Engine - if initiatorIP != targetIP { - engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) - } - } else { - engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + e.log.Info("Fetching replica list from target engine") + targetEngine, err := targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) } for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } - if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + if _, ok := targetEngine.ReplicaAddressMap[replicaName]; !ok { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { @@ -276,12 +306,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaStatusMap[replicaName].BdevName = replicaName } } - e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) - e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) + + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log.Infof("Connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } - e.log.Info("Launching frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { + log := e.log.WithFields(logrus.Fields{ + "initiatorCreationRequired": initiatorCreationRequired, + "targetCreationRequired": targetCreationRequired, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }) + + log.Info("Handling frontend during engine creation") + if err := e.handleFrontend(spdkClient, superiorPortAllocator, portCount, targetAddress, initiatorCreationRequired, targetCreationRequired); err != nil { return nil, err } @@ -366,7 +404,8 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, + initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -376,45 +415,87 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - initiatorIP, _, err := splitHostPort(initiatorAddress) + targetIP, targetPort, err := splitHostPort(targetAddress) if err != nil { - return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + return err } - targetIP, targetPort, err := splitHostPort(targetAddress) + e.Nqn = helpertypes.GetNQN(e.Name) + e.Nguid = commonutils.RandomID(nvmeNguidLength) + + dmDeviceIsBusy := false + port := int32(0) + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { - return errors.Wrapf(err, "failed to split target address %v", targetAddress) + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) } - e.Nqn = helpertypes.GetNQN(e.Name) + defer func() { + if err == nil { + e.initiator = initiator + e.dmDeviceIsBusy = dmDeviceIsBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + + e.log.Infof("Finished handling frontend for engine: %+v", e) + } + }() - var port int32 - if !upgradeRequired { - e.Nguid = commonutils.RandomID(nvmeNguidLength) + if initiatorCreationRequired && !targetCreationRequired { + initiator.TransportAddress = targetIP + initiator.TransportServiceID = strconv.Itoa(int(targetPort)) + + e.log.Infof("Target instance already exists on %v, no need to create target instance", targetAddress) + e.Port = targetPort - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + // TODO: + // "nvme list -o json" might be empty devices for a while instance manager pod is just started. + // The root cause is not clear, so we need to retry to load NVMe device info. + for r := 0; r < maxNumRetries; r++ { + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil && strings.Contains(err.Error(), "failed to get devices") { + time.Sleep(retryInterval) + continue + } + if err == nil { + e.log.Info("Loaded NVMe device info for engine") + break + } + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) } - port, _, err = superiorPortAllocator.AllocateRange(portCount) + err = initiator.LoadEndpoint(false) if err != nil { - return err + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) } - e.log.Infof("Allocated port %v", port) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { - return err - } + return nil + } - if initiatorCreationRequired { - e.Port = port - e.TargetPort = port - } else { - e.TargetPort = port - } - } else { - e.Port = targetPort + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return errors.Wrapf(err, "failed to allocate port for engine %v", e.Name) + } + e.log.Infof("Allocated port %v for engine", port) + + if initiatorCreationRequired { + e.Port = port + } + if targetCreationRequired { + e.TargetPort = port + } + + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err } if e.Frontend == types.FrontendSPDKTCPNvmf { @@ -422,57 +503,20 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - if initiatorIP != targetIP && !upgradeRequired { - e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) + if !initiatorCreationRequired && targetCreationRequired { + e.log.Infof("Only creating target instance for engine, no need to start initiator") return nil } - initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + dmDeviceIsBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) if err != nil { - return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) - } - - dmDeviceBusy := false - if initiatorCreationRequired { - e.log.Info("Starting initiator for engine") - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - e.log.Info("Loading NVMe device info for engine") - err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) - if err != nil { - if nvme.IsValidNvmeDeviceNotFound(err) { - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) - } - } - err = initiator.LoadEndpoint(false) - if err != nil { - return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) - } - //dmDeviceBusy = true + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) } - e.initiator = initiator - e.dmDeviceBusy = dmDeviceBusy - e.Endpoint = initiator.GetEndpoint() - - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": port, - }) return nil } func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Info("Deleting engine") - requireUpdate := false e.Lock() @@ -502,37 +546,35 @@ func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *co } }() - if e.Endpoint != "" { - nqn := helpertypes.GetNQN(e.Name) + e.log.Info("Deleting engine") - if e.initiator != nil { - if _, err := e.initiator.Stop(true, true, true); err != nil { - return err - } - e.initiator = nil - } + if e.Nqn == "" { + e.Nqn = helpertypes.GetNQN(e.Name) + } - if err := spdkClient.StopExposeBdev(nqn); err != nil { + // Stop the frontend + if e.initiator != nil { + if _, err := e.initiator.Stop(true, true, true); err != nil { return err } - + e.initiator = nil e.Endpoint = "" + requireUpdate = true } - if e.TargetPort != 0 || e.Port != 0 { - port := e.TargetPort - if port == 0 { - port = e.Port - } - if err := superiorPortAllocator.ReleaseRange(port, port); err != nil { - return err - } - e.TargetPort = 0 - e.Port = 0 - requireUpdate = true + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return err + } + + // Release the ports if they are allocated + err = e.releasePorts(superiorPortAllocator) + if err != nil { + return err } + requireUpdate = true + // Delete the Raid bdev and disconnect the replicas if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return err } @@ -555,6 +597,41 @@ func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *co return nil } +func (e *Engine) releasePorts(superiorPortAllocator *commonbitmap.Bitmap) (err error) { + ports := map[int32]struct{}{ + e.Port: {}, + e.TargetPort: {}, + } + + if errRelease := releasePortIfExists(superiorPortAllocator, ports, e.Port); errRelease != nil { + err = multierr.Append(err, errRelease) + } + e.Port = 0 + + if errRelease := releasePortIfExists(superiorPortAllocator, ports, e.TargetPort); errRelease != nil { + err = multierr.Append(err, errRelease) + } + e.TargetPort = 0 + + return err +} + +func releasePortIfExists(superiorPortAllocator *commonbitmap.Bitmap, ports map[int32]struct{}, port int32) error { + if port == 0 { + return nil + } + + _, exists := ports[port] + if exists { + if err := superiorPortAllocator.ReleaseRange(port, port); err != nil { + return err + } + delete(ports, port) + } + + return nil +} + func (e *Engine) Get() (res *spdkrpc.Engine) { e.RLock() defer e.RUnlock() @@ -639,16 +716,8 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { } }() - podIP, err := commonnet.GetIPForPod() - if err != nil { - return err - } - if e.IP != podIP { - // Skip the validation if the engine is being upgraded - if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { - return nil - } - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) + if e.IP != e.TargetIP { + return nil } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -659,6 +728,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { if spdktypes.GetBdevType(bdevRaid) != spdktypes.BdevTypeRaid { return fmt.Errorf("cannot find a raid bdev for engine %v", e.Name) } + bdevRaidSize := bdevRaid.NumBlocks * uint64(bdevRaid.BlockSize) if e.SpecSize != bdevRaidSize { return fmt.Errorf("found mismatching between engine spec size %d and actual raid bdev size %d for engine %s", e.SpecSize, bdevRaidSize, e.Name) @@ -694,6 +764,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { containValidReplica = true } } + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) if !containValidReplica { @@ -834,16 +905,22 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() { } func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.NvmfSubsystem) (err error) { - if e.Frontend != types.FrontendEmpty && e.Frontend != types.FrontendSPDKTCPNvmf && e.Frontend != types.FrontendSPDKTCPBlockdev { + if e.Frontend != types.FrontendEmpty && + e.Frontend != types.FrontendSPDKTCPNvmf && + e.Frontend != types.FrontendSPDKTCPBlockdev { return fmt.Errorf("unknown frontend type %s", e.Frontend) } nqn := helpertypes.GetNQN(e.Name) - subsystem := subsystemMap[nqn] + + subsystem, ok := subsystemMap[nqn] + if !ok { + return fmt.Errorf("cannot find the NVMf subsystem for engine %s", e.Name) + } if e.Frontend == types.FrontendEmpty { if subsystem != nil { - return fmt.Errorf("found nvmf subsystem %s for engine %s with empty frontend", nqn, e.Name) + return fmt.Errorf("found NVMf subsystem %s for engine %s with empty frontend", nqn, e.Name) } if e.Endpoint != "" { return fmt.Errorf("found non-empty endpoint %s for engine %s with empty frontend", e.Endpoint, e.Name) @@ -855,7 +932,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv } if subsystem == nil || len(subsystem.ListenAddresses) == 0 { - return fmt.Errorf("cannot find the Nvmf subsystem for engine %s", e.Name) + return fmt.Errorf("cannot find the NVMf subsystem for engine %s", e.Name) } port := 0 @@ -872,7 +949,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv } } if port == 0 || e.Port != int32(port) { - return fmt.Errorf("cannot find a matching listener with port %d from Nvmf subsystem for engine %s", e.Port, e.Name) + return fmt.Errorf("cannot find a matching listener with port %d from NVMf subsystem for engine %s", e.Port, e.Name) } switch e.Frontend { @@ -880,7 +957,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv if e.initiator == nil { initiator, err := nvme.NewInitiator(e.VolumeName, nqn, nvme.HostProc) if err != nil { - return err + return errors.Wrapf(err, "failed to create initiator for engine %v during frontend validation and update", e.Name) } e.initiator = initiator } @@ -892,7 +969,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv } return err } - if err := e.initiator.LoadEndpoint(e.dmDeviceBusy); err != nil { + if err := e.initiator.LoadEndpoint(e.dmDeviceIsBusy); err != nil { return err } blockDevEndpoint := e.initiator.GetEndpoint() @@ -2037,8 +2114,9 @@ func (e *Engine) Suspend(spdkClient *spdkclient.Client) (err error) { if err != nil { if e.State != types.InstanceStateError { - e.State = types.InstanceStateError - e.log.WithError(err).Info("Failed to suspend engine, will mark the engine as error") + e.log.WithError(err).Warn("Failed to suspend engine") + // Engine is still alive and running and is not really in error state. + // longhorn-manager will retry to suspend the engine. } e.ErrorMsg = err.Error() } else { @@ -2069,8 +2147,9 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { if err != nil { if e.State != types.InstanceStateError { - e.State = types.InstanceStateError - e.log.WithError(err).Info("Failed to resume engine, will mark the engine as error") + e.log.WithError(err).Warn("Failed to resume engine") + // Engine is still alive and running and is not really in error state. + // longhorn-manager will retry to resume the engine. } e.ErrorMsg = err.Error() } else { @@ -2098,37 +2177,53 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { } // SwitchOverTarget function in the Engine struct is responsible for switching the engine's target to a new address. -func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress string) (err error) { - e.log.Infof("Switching over engine to target address %s", targetAddress) +func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddress string) (err error) { + e.log.Infof("Switching over engine to target address %s", newTargetAddress) + + if newTargetAddress == "" { + return fmt.Errorf("invalid empty target address for engine %s target switchover", e.Name) + } currentTargetAddress := "" + podIP, err := commonnet.GetIPForPod() + if err != nil { + return errors.Wrapf(err, "failed to get IP for pod for engine %s target switchover", e.Name) + } + + newTargetIP, newTargetPort, err := splitHostPort(newTargetAddress) + if err != nil { + return errors.Wrapf(err, "failed to split target address %s for engine %s target switchover", newTargetAddress, e.Name) + } + e.Lock() defer func() { e.Unlock() if err != nil { - e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", targetAddress) + e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", newTargetAddress) - if disconnected, errCheck := e.IsTargetDisconnected(); errCheck != nil { - e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", targetAddress) + if disconnected, errCheck := e.isTargetDisconnected(); errCheck != nil { + e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", newTargetAddress) } else if disconnected { - if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { - e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) - } else { - e.log.Infof("Connected target back to %s", currentTargetAddress) - - if errReload := e.reloadDevice(); errReload != nil { - e.log.WithError(errReload).Warnf("Failed to reload device mapper") + if currentTargetAddress != "" { + if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { + e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) } else { - e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + e.log.Infof("Connected target back to %s", currentTargetAddress) + + if errReload := e.reloadDevice(); errReload != nil { + e.log.WithError(errReload).Warnf("Failed to reload device mapper") + } else { + e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + } } } } } else { e.ErrorMsg = "" - e.log.Infof("Switched over target to %s", targetAddress) + e.log.Infof("Switched over target to %s", newTargetAddress) } e.UpdateCh <- nil @@ -2139,6 +2234,7 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) } + // Check if the engine is suspended before target switchover. suspended, err := initiator.IsSuspended() if err != nil { return errors.Wrapf(err, "failed to check if engine %s is suspended", e.Name) @@ -2147,44 +2243,46 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return fmt.Errorf("engine %s must be suspended before target switchover", e.Name) } + // Load NVMe device info before target switchover. if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s target switchover", e.Name) } } currentTargetAddress = net.JoinHostPort(initiator.TransportAddress, initiator.TransportServiceID) - if e.isSwitchOverTargetRequired(currentTargetAddress, targetAddress) { + if isSwitchOverTargetRequired(currentTargetAddress, newTargetAddress) { if currentTargetAddress != "" { if err := e.disconnectTarget(currentTargetAddress); err != nil { - return err + return errors.Wrapf(err, "failed to disconnect target %s for engine %s", currentTargetAddress, e.Name) } } - if err := e.connectTarget(targetAddress); err != nil { - return err + if err := e.connectTarget(newTargetAddress); err != nil { + return errors.Wrapf(err, "failed to connect target %s for engine %s", newTargetAddress, e.Name) } } // Replace IP and Port with the new target address. - // No need to update TargetIP and TargetPort, because target is not delete yet. - targetIP, targetPort, err := splitHostPort(targetAddress) - if err != nil { - return errors.Wrapf(err, "failed to split target address %s", targetAddress) - } + // No need to update TargetIP, because old target is not delete yet. + e.IP = newTargetIP + e.Port = newTargetPort - e.IP = targetIP - e.Port = targetPort + if newTargetIP == podIP { + e.TargetPort = newTargetPort + } else { + e.TargetPort = 0 + } e.log.Info("Reloading device mapper after target switchover") if err := e.reloadDevice(); err != nil { - return err + return errors.Wrapf(err, "failed to reload device mapper after engine %s target switchover", e.Name) } return nil } -func (e *Engine) IsTargetDisconnected() (bool, error) { +func (e *Engine) isTargetDisconnected() (bool, error) { initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { return false, errors.Wrapf(err, "failed to create initiator for checking engine %s target disconnected", e.Name) @@ -2199,7 +2297,7 @@ func (e *Engine) IsTargetDisconnected() (bool, error) { } if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return false, errors.Wrapf(err, "failed to load NVMe device info for checking engine %s target disconnected", e.Name) } } @@ -2231,7 +2329,7 @@ func (e *Engine) disconnectTarget(targetAddress string) error { } if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s disconnect target %v", e.Name, targetAddress) } } @@ -2267,7 +2365,7 @@ func (e *Engine) connectTarget(targetAddress string) error { } if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s connect target %v:%v", e.Name, targetIP, targetPort) } } @@ -2311,30 +2409,32 @@ func (e *Engine) connectTarget(targetAddress string) error { return nil } -// DeleteTarget deletes the target +// DeleteTarget deletes the target instance func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Infof("Deleting target") + e.log.Infof("Deleting target with target port %d", e.TargetPort) - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to stop expose bdev after engine %s target switchover", e.Name) + err = spdkClient.StopExposeBdev(e.Nqn) + if err != nil { + return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name) } - if e.TargetPort != 0 { - if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { - return err - } - e.TargetPort = 0 + err = e.releaseTargetPort(superiorPortAllocator) + if err != nil { + return errors.Wrapf(err, "failed to release target port while deleting target instance for engine %s", e.Name) } - e.log.Infof("Deleting raid bdev %s before target switchover", e.Name) - if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { - return errors.Wrapf(err, "failed to delete raid bdev after engine %s target switchover", e.Name) + e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name) + _, err = spdkClient.BdevRaidDelete(e.Name) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return errors.Wrapf(err, "failed to delete raid bdev after engine %s while deleting target instance", e.Name) } for replicaName, replicaStatus := range e.ReplicaStatusMap { - e.log.Infof("Disconnecting replica %s after target switchover", replicaName) - if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { - e.log.WithError(err).Warnf("Engine failed to disconnect replica %s after target switchover, will mark the replica mode from %v to ERR", replicaName, replicaStatus.Mode) + e.log.Infof("Disconnecting replica %s while deleting target instance", replicaName) + err = disconnectNVMfBdev(spdkClient, replicaStatus.BdevName) + if err != nil { + e.log.WithError(err).Warnf("Engine failed to disconnect replica %s while deleting target instance, will mark the replica mode from %v to ERR", + replicaName, replicaStatus.Mode) replicaStatus.Mode = types.ModeERR } replicaStatus.BdevName = "" @@ -2342,14 +2442,20 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat return nil } -func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { +func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { return oldTargetAddress != newTargetAddress } -func engineOnlyContainsInitiator(e *Engine) bool { - return e.Port != 0 && e.TargetPort == 0 -} +func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) error { + releaseTargetPortRequired := e.TargetPort != 0 + + // Release the target port + if releaseTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { + return errors.Wrapf(err, "failed to release target port %d", e.TargetPort) + } + } + e.TargetPort = 0 -func engineOnlyContainsTarget(e *Engine) bool { - return e.Port == 0 && e.TargetPort != 0 + return nil } diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 5075ed96..8477aa57 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -418,7 +418,7 @@ func compareSvcLvols(prev, cur *Lvol, checkChildren, checkActualSize bool) error func getExposedPort(subsystem *spdktypes.NvmfSubsystem) (exposedPort int32, err error) { if subsystem == nil || len(subsystem.ListenAddresses) == 0 { - return 0, fmt.Errorf("cannot find the Nvmf subsystem") + return 0, fmt.Errorf("cannot find the NVMf subsystem") } port := 0 @@ -434,7 +434,7 @@ func getExposedPort(subsystem *spdktypes.NvmfSubsystem) (exposedPort int32, err return int32(port), nil } - return 0, fmt.Errorf("cannot find a exposed port in the Nvmf subsystem") + return 0, fmt.Errorf("cannot find a exposed port in the NVMf subsystem") } func (r *Replica) validateReplicaHead(headBdevLvol *spdktypes.BdevInfo) (err error) { @@ -518,7 +518,7 @@ func (r *Replica) prepareHead(spdkClient *spdkclient.Client) (err error) { } if !isHeadAvailable { - r.log.Info("Creating a lvol bdev as replica Head") + r.log.Info("Creating a lvol bdev as replica head") if r.ActiveChain[len(r.ActiveChain)-1] != nil { // The replica has a backing image or somehow there are already snapshots in the chain if _, err := spdkClient.BdevLvolClone(r.ActiveChain[len(r.ActiveChain)-1].UUID, r.Name); err != nil { return err @@ -801,7 +801,7 @@ func (r *Replica) Delete(spdkClient *spdkclient.Client, cleanupRequired bool, su } if r.IsExposed { - r.log.Info("Unexposing lvol bdev for replica deletion") + r.log.Info("Unexposing bdev for replica deletion") if err := spdkClient.StopExposeBdev(helpertypes.GetNQN(r.Name)); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return err } diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index f079257f..b64d1db3 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -878,7 +878,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 1123fc6f..cc6e243c 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine without the frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine with the previous frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1375,77 +1375,3 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { err = spdkCli.ReplicaDelete(replicaName2, false) c.Assert(err, IsNil) } - -func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { - fmt.Println("Testing SPDK Engine Creation with Upgrade Required") - - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - volumeName := "test-vol" - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) - c.Assert(err, IsNil) - - targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) - c.Assert(err, IsNil) - c.Assert(engine.Endpoint, Not(Equals), "") - // Initiator is not created, so the IP and Port should be empty - c.Assert(engine.IP, Equals, ip) - c.Assert(engine.Port, Not(Equals), int32(0)) - // Target is created and exposed - c.Assert(engine.TargetIP, Equals, ip) - c.Assert(engine.TargetPort, Not(Equals), int32(0)) - c.Assert(engine.Port, Equals, engine.TargetPort) - - // Tear down engine and replicas - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - - err = spdkCli.ReplicaDelete(replicaName1, false) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName2, false) - c.Assert(err, IsNil) -}