Skip to content

Commit

Permalink
initiator: add methods for engine upgrade
Browse files Browse the repository at this point in the history
Longhorn 6001

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Jun 4, 2024
1 parent e7b53a9 commit 8f31293
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 37 deletions.
260 changes: 225 additions & 35 deletions pkg/nvme/initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,82 @@ func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) {
}, nil
}

func (i *Initiator) DiscoverTarget(ip, port string) (string, error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()
}

return DiscoverTarget(ip, port, i.executor)
}

func (i *Initiator) ConnectTarget(ip, port, nqn string) (string, error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()
}

return ConnectTarget(ip, port, nqn, i.executor)
}

func (i *Initiator) DisconnectTarget() error {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()
}

return DisconnectTarget(i.SubsystemNQN, i.executor)
}

func (i *Initiator) WaitForDisconnect() (err error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()
}

for r := 0; r < RetryCounts; r++ {
err = i.loadNVMeDeviceInfoWithoutLock()
if strings.Contains(err.Error(), "cannot find a valid nvme device") {
return nil
}
time.Sleep(1 * time.Second)
}

return err
}

func (i *Initiator) WaitForConnect() (err error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()
}

for r := 0; r < RetryCounts; r++ {
err = i.loadNVMeDeviceInfoWithoutLock()
if err == nil {
return nil
}
time.Sleep(1 * time.Second)
}

return err
}

// Suspend suspends the device mapper device for the NVMe initiator
func (i *Initiator) Suspend(noflush, nolockfs bool) error {
if i.hostProc != "" {
Expand All @@ -88,16 +164,47 @@ func (i *Initiator) Suspend(noflush, nolockfs bool) error {
defer lock.Unlock()
}

if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil {
return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name)
suspended, err := i.isLinearDmDeviceSuspended()
if err != nil {
return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name)
}

if !suspended {
if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil {
return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name)
}
}

return nil
}

// Resume resumes the device mapper device for the NVMe initiator
func (i *Initiator) Resume() error {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()
}

if err := i.resumeLinearDmDevice(); err != nil {
return errors.Wrapf(err, "failed to resume device mapper device for NVMe initiator %s", i.Name)
}

return nil
}

func (i *Initiator) replaceDmDeviceTarget() error {
if err := i.suspendLinearDmDevice(true, false); err != nil {
return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name)
suspended, err := i.isLinearDmDeviceSuspended()
if err != nil {
return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name)
}

if !suspended {
if err := i.suspendLinearDmDevice(true, false); err != nil {
return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name)
}
}

if err := i.reloadLinearDmDevice(); err != nil {
Expand All @@ -110,14 +217,35 @@ func (i *Initiator) replaceDmDeviceTarget() error {
return nil
}

func (i *Initiator) isCleanUpDmDeviceRequired(upgradeRequired bool) (bool, error) {
if !upgradeRequired {
return true, nil
}

dmDeviceExists, err := i.isLinearDmDeviceExist()
if err != nil {
return false, errors.Wrapf(err, "failed to check linear dm device for NVMe initiator %s", i.Name)
}
if !dmDeviceExists {
return false, fmt.Errorf("linear dm device doesn't exist for NVMe initiator %s", i.Name)
}

return false, nil
}

// Start starts the NVMe initiator with the given transportAddress and transportServiceID
func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDeviceCleanup bool) (dmDeviceBusy bool, err error) {
func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceCreationRequired, upgradeRequired bool) (dmDeviceBusy bool, err error) {
defer func() {
if err != nil {
err = errors.Wrapf(err, "failed to start NVMe initiator %s", i.Name)
}
}()

dmDeviceCleanupRequired, err := i.isCleanUpDmDeviceRequired(upgradeRequired)
if err != nil {
return false, errors.Wrapf(err, "failed to check if cleaning up linear device is required for NVMe initiator %s", i.Name)
}

if transportAddress == "" || transportServiceID == "" {
return false, fmt.Errorf("invalid TransportAddress %s and TransportServiceID %s for initiator %s start", transportAddress, transportServiceID, i.Name)
}
Expand Down Expand Up @@ -149,10 +277,14 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
}

i.logger.Infof("Stopping NVMe initiator blindly before starting")
dmDeviceBusy, err = i.stopWithoutLock(needDmDeviceCleanup, false, false)
dmDeviceBusy, err = i.stopWithoutLock(dmDeviceCleanupRequired, false, false)
if err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to stop the mismatching NVMe initiator %s before starting", i.Name)
}
if upgradeRequired {
i.logger.Info("Linear dm device is busy, since upgrade is required")
dmDeviceBusy = true
}

i.logger.WithFields(logrus.Fields{
"transportAddress": transportAddress,
Expand Down Expand Up @@ -191,42 +323,55 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
return dmDeviceBusy, errors.Wrapf(err, "failed to load device info after starting NVMe initiator %s", i.Name)
}

needMakeEndpoint := true
if needDmDeviceCleanup {
if dmDeviceBusy {
// Endpoint is already created, just replace the target device
needMakeEndpoint = false
i.logger.Infof("Linear dm device %s is busy, trying the best to replace the target device for NVMe initiator %s", i.Name, i.Name)
if err := i.replaceDmDeviceTarget(); err != nil {
i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe initiator %s", i.Name)
if dmDeviceCreationRequired {
needMakeEndpoint := true
if dmDeviceCleanupRequired {
if dmDeviceBusy {
// Endpoint is already created, just replace the target device
needMakeEndpoint = false
i.logger.Infof("Linear dm device is busy, trying the best to replace the target device for NVMe initiator %s", i.Name)
if err := i.replaceDmDeviceTarget(); err != nil {
i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe initiator %s", i.Name)
} else {
i.logger.Infof("Successfully replaced the target device for NVMe initiator %s", i.Name)
dmDeviceBusy = false
}
} else {
i.logger.Infof("Successfully replaced the target device for NVMe initiator %s", i.Name)
dmDeviceBusy = false
i.logger.Infof("Creating linear dm device for NVMe initiator %s", i.Name)
if err := i.createLinearDmDevice(); err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name)
}
}
} else {
i.logger.Infof("Creating linear dm device for NVMe initiator %s", i.Name)
if err := i.createLinearDmDevice(); err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name)
if upgradeRequired {
needMakeEndpoint = false
i.logger.Info("Replacing target device of linear dm device for NVMe initiator")
if err := i.replaceDmDeviceTarget(); err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to reload linear dm device for NVMe initiator %s for upgrade", i.Name)
}

i.logger.Infof("Successfully replaced the target device for NVMe initiator %s for upgrade", i.Name)
dmDeviceBusy = false
} else {
i.logger.Infof("Skipping creating linear dm device for NVMe initiator %s", i.Name)
}

i.dev.Export = i.dev.Nvme
}
} else {
i.logger.Infof("Skipping creating linear dm device for NVMe initiator %s", i.Name)
i.dev.Export = i.dev.Nvme
}

if needMakeEndpoint {
i.logger.Infof("Creating endpoint %v", i.Endpoint)
if err := i.makeEndpoint(); err != nil {
return dmDeviceBusy, err
if needMakeEndpoint {
i.logger.Infof("Creating endpoint %v", i.Endpoint)
if err := i.makeEndpoint(); err != nil {
return dmDeviceBusy, err
}
}
}

i.logger.Infof("Launched NVMe initiator: %+v", i)

return dmDeviceBusy, nil
}

func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
func (i *Initiator) Stop(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
Expand All @@ -235,7 +380,7 @@ func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmD
defer lock.Unlock()
}

return i.stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice)
return i.stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice)
}

func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
Expand All @@ -254,9 +399,9 @@ func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmD
return false, nil
}

func (i *Initiator) stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
func (i *Initiator) stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
dmDeviceBusy := false
if needDmDeviceCleanup {
if dmDeviceCleanupRequired {
var err error
dmDeviceBusy, err = i.removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice)
if err != nil {
Expand Down Expand Up @@ -378,8 +523,16 @@ func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error {
if dmDeviceBusy {
i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %s due to device busy", i.Endpoint, i.Name)
} else {
if i.NamespaceName != "" && !i.isNamespaceExist(depDevices) {
return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Nvme.Name, i.Endpoint, i.Name)
suspended, err := i.isLinearDmDeviceSuspended()
if err != nil {
return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name)
}
if !suspended {
if i.NamespaceName != "" && !i.isNamespaceExist(depDevices) {
return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Name, i.Endpoint, i.Name)
}
} else {
i.logger.Infof("Skipping endpoint %v loading for NVMe initiator %s due to linear dm device suspended", i.Endpoint, i.Name)
}
}

Expand Down Expand Up @@ -482,6 +635,18 @@ func (i *Initiator) resumeLinearDmDevice() error {
return util.DmsetupResume(i.Name, i.executor)
}

func (i *Initiator) ReloadDmDevice() (err error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for NVMe initiator %s", i.Name)
}
defer lock.Unlock()
}

return i.reloadLinearDmDevice()
}

func (i *Initiator) reloadLinearDmDevice() error {
devPath := fmt.Sprintf("/dev/%s", i.dev.Nvme.Name)

Expand All @@ -500,11 +665,36 @@ func (i *Initiator) reloadLinearDmDevice() error {

table := fmt.Sprintf("0 %v linear %v 0", sectors, devPath)

logrus.Infof("Reloading linear dm device %s with table %s", i.Name, table)
logrus.Infof("Reloading linear dm device %s with table '%s'", i.Name, table)

return util.DmsetupReload(i.Name, table, i.executor)
}

func (i *Initiator) isLinearDmDeviceExist() (bool, error) {
dmDevPath := getDmDevicePath(i.Name)
if _, err := os.Stat(dmDevPath); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}

func getDmDevicePath(name string) string {
return fmt.Sprintf("/dev/mapper/%s", name)
}

func (i *Initiator) isLinearDmDeviceSuspended() (bool, error) {
devices, err := util.DmsetupInfo(i.Name, i.executor)
if err != nil {
return false, err
}

for _, device := range devices {
if device.Name == i.Name {
return device.Suspended, nil
}
}
return false, fmt.Errorf("failed to find linear dm device %s", i.Name)
}
2 changes: 1 addition & 1 deletion pkg/spdk/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (s *TestSuite) TestSPDKBasic(c *C) {
initiator, err := nvme.NewInitiator(raidName, nqn, nvme.HostProc)
c.Assert(err, IsNil)

dmDeviceBusy, err := initiator.Start(types.LocalIP, defaultPort1, true)
dmDeviceBusy, err := initiator.Start(types.LocalIP, defaultPort1, true, false)
c.Assert(dmDeviceBusy, Equals, false)
c.Assert(err, IsNil)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func DetectDevice(path string, executor *commonNs.Executor) (*BlockDevice, error
f := strings.Fields(line)
if len(f) == 2 {
dev = &BlockDevice{
Name: f[0],
Name: f[0],
}
_, err = fmt.Sscanf(f[1], "%d:%d", &dev.Major, &dev.Minor)
if err != nil {
Expand Down

0 comments on commit 8f31293

Please sign in to comment.