From 163c5e7e252af6d62087109b4a01571fc205a549 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Sun, 8 Dec 2024 16:53:41 +0800 Subject: [PATCH] refactor: mainly refactor engine codes Longhorn 9919 Signed-off-by: Derek Su --- pkg/client/client.go | 3 +- pkg/spdk/engine.go | 516 +++++++++++++++++++++++++------------------ pkg/spdk/replica.go | 8 +- pkg/spdk/server.go | 2 +- pkg/spdk_test.go | 86 +------- 5 files changed, 319 insertions(+), 296 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 110069f8..8662fad1 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -474,7 +474,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,7 +490,6 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui ReplicaAddressMap: replicaAddressMap, Frontend: frontend, PortCount: portCount, - UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, SalvageRequested: salvageRequested, diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 5a11e76d..2f3773e3 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "go.uber.org/multierr" grpccodes "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -41,9 +42,9 @@ type Engine struct { SpecSize uint64 ActualSize uint64 IP string - Port int32 + Port int32 // Port that initiator is connecting to TargetIP string - TargetPort int32 + TargetPort int32 // Port of the target that is used for letting initiator connect to Frontend string Endpoint string Nqn string @@ -54,8 +55,8 @@ type Engine struct { ReplicaStatusMap map[string]*EngineReplicaStatus - initiator *nvme.Initiator - dmDeviceBusy bool + initiator *nvme.Initiator + dmDeviceIsBusy bool State types.InstanceState ErrorMsg string @@ -113,10 +114,41 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) isNewEngine() bool { + return e.IP == "" && e.TargetIP == "" +} + +func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { + initiatorCreationRequired, targetCreationRequired := false, false + var err error + + if podIP == initiatorIP && podIP == targetIP { + if e.Port == 0 && e.TargetPort == 0 { + e.log.Info("Creating both initiator and target instances") + initiatorCreationRequired = true + targetCreationRequired = true + } else if e.Port != 0 && e.TargetPort == 0 { + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort) + } + } else if podIP == initiatorIP { + e.log.Info("Creating an initiator instance") + initiatorCreationRequired = true + } else if podIP == targetIP { + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + err = fmt.Errorf("invalid initiator and target addresses for engine %s creation with initiator address %v and target address %v", e.Name, initiatorIP, targetIP) + } + + return initiatorCreationRequired, targetCreationRequired, err +} + +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, - "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, @@ -128,7 +160,6 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.Lock() defer func() { e.Unlock() - if requireUpdate { e.UpdateCh <- nil } @@ -143,6 +174,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str if err != nil { return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } + targetIP, _, err := splitHostPort(targetAddress) if err != nil { return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) @@ -177,29 +209,30 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str replicaBdevList := []string{} - initiatorCreationRequired := true - if !upgradeRequired { - if e.IP == "" { - if initiatorIP != targetIP { - // For creating target on another node - initiatorCreationRequired = false - e.log.Info("Creating an target engine") - e.TargetIP = podIP - } else { - // For newly creating engine - e.log.Info("Creating an new engine") - e.IP = podIP - e.TargetIP = podIP - } + initiatorCreationRequired, targetCreationRequired, err := e.checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP) + if err != nil { + return nil, err + } + if !initiatorCreationRequired && !targetCreationRequired { + return e.getWithoutLock(), nil + } - e.log = e.log.WithField("ip", e.IP) - } else { - if initiatorIP != targetIP { - return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) - } + if e.isNewEngine() { + if initiatorCreationRequired { + e.IP = initiatorIP + } + e.TargetIP = targetIP + } - // For creating target on attached node - initiatorCreationRequired = false + e.log = e.log.WithFields(logrus.Fields{ + "initiatorIP": e.IP, + "targetIP": e.TargetIP, + }) + + if targetCreationRequired { + _, err := spdkClient.BdevRaidGet(e.Name, 0) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to get raid bdev %v during engine creation", e.Name) } if salvageRequested { @@ -231,18 +264,15 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) + // TODO: improve the log message + e.log.Infof("Connecting all available replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } } else { - // For reconstructing engine after switching over target to another node - initiatorCreationRequired = false - - e.IP = targetIP + e.log.Info("Skipping target creation during engine creation") - // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKServiceAddress := net.JoinHostPort(e.TargetIP, strconv.Itoa(types.SPDKServicePort)) targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err @@ -253,21 +283,17 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - var engineWithTarget *api.Engine - if initiatorIP != targetIP { - engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) - } - } else { - engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + e.log.Info("Fetching replica list from target engine") + targetEngine, err := targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) } for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } - if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + if _, ok := targetEngine.ReplicaAddressMap[replicaName]; !ok { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { @@ -276,12 +302,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaStatusMap[replicaName].BdevName = replicaName } } - e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) - e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) + + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log.Infof("Connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } - e.log.Info("Launching frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { + log := e.log.WithFields(logrus.Fields{ + "initiatorCreationRequired": initiatorCreationRequired, + "targetCreationRequired": targetCreationRequired, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }) + + log.Info("Handling frontend during engine creation") + if err := e.handleFrontend(spdkClient, superiorPortAllocator, portCount, targetAddress, initiatorCreationRequired, targetCreationRequired); err != nil { return nil, err } @@ -366,7 +400,8 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, + initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -376,45 +411,87 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - initiatorIP, _, err := splitHostPort(initiatorAddress) + targetIP, targetPort, err := splitHostPort(targetAddress) if err != nil { - return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + return err } - targetIP, targetPort, err := splitHostPort(targetAddress) + e.Nqn = helpertypes.GetNQN(e.Name) + e.Nguid = commonutils.RandomID(nvmeNguidLength) + + dmDeviceIsBusy := false + port := int32(0) + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { - return errors.Wrapf(err, "failed to split target address %v", targetAddress) + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) } - e.Nqn = helpertypes.GetNQN(e.Name) + defer func() { + if err == nil { + e.initiator = initiator + e.dmDeviceIsBusy = dmDeviceIsBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + + e.log.Infof("Finished handling frontend for engine: %+v", e) + } + }() - var port int32 - if !upgradeRequired { - e.Nguid = commonutils.RandomID(nvmeNguidLength) + if initiatorCreationRequired && !targetCreationRequired { + initiator.TransportAddress = targetIP + initiator.TransportServiceID = strconv.Itoa(int(targetPort)) + + e.log.Infof("Target instance already exists on %v, no need to create target instance", targetAddress) + e.Port = targetPort - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + // TODO: + // "nvme list -o json" might be empty devices for a while instance manager pod is just started. + // The root cause is not clear, so we need to retry to load NVMe device info. + for r := 0; r < maxNumRetries; r++ { + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil && strings.Contains(err.Error(), "failed to get devices") { + time.Sleep(retryInterval) + continue + } + if err == nil { + e.log.Infof("Loaded NVMe device info for engine") + break + } + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) } - port, _, err = superiorPortAllocator.AllocateRange(portCount) + err = initiator.LoadEndpoint(false) if err != nil { - return err + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) } - e.log.Infof("Allocated port %v", port) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { - return err - } + return nil + } - if initiatorCreationRequired { - e.Port = port - e.TargetPort = port - } else { - e.TargetPort = port - } - } else { - e.Port = targetPort + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return errors.Wrapf(err, "failed to allocate port for engine %v", e.Name) + } + e.log.Infof("Allocated port %v for engine", port) + + if initiatorCreationRequired { + e.Port = port + } + if targetCreationRequired { + e.TargetPort = port + } + + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err } if e.Frontend == types.FrontendSPDKTCPNvmf { @@ -422,57 +499,20 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - if initiatorIP != targetIP && !upgradeRequired { - e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) + if !initiatorCreationRequired && targetCreationRequired { + e.log.Infof("Only creating target instance for engine, no need to start initiator") return nil } - initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + dmDeviceIsBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) if err != nil { - return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) - } - - dmDeviceBusy := false - if initiatorCreationRequired { - e.log.Info("Starting initiator for engine") - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - e.log.Info("Loading NVMe device info for engine") - err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) - if err != nil { - if nvme.IsValidNvmeDeviceNotFound(err) { - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) - } - } - err = initiator.LoadEndpoint(false) - if err != nil { - return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) - } - //dmDeviceBusy = true + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) } - e.initiator = initiator - e.dmDeviceBusy = dmDeviceBusy - e.Endpoint = initiator.GetEndpoint() - - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": port, - }) return nil } func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Info("Deleting engine") - requireUpdate := false e.Lock() @@ -502,37 +542,35 @@ func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *co } }() - if e.Endpoint != "" { - nqn := helpertypes.GetNQN(e.Name) + e.log.Info("Deleting engine") - if e.initiator != nil { - if _, err := e.initiator.Stop(true, true, true); err != nil { - return err - } - e.initiator = nil - } + if e.Nqn == "" { + e.Nqn = helpertypes.GetNQN(e.Name) + } - if err := spdkClient.StopExposeBdev(nqn); err != nil { + // Stop the frontend + if e.initiator != nil { + if _, err := e.initiator.Stop(true, true, true); err != nil { return err } - + e.initiator = nil e.Endpoint = "" + requireUpdate = true } - if e.TargetPort != 0 || e.Port != 0 { - port := e.TargetPort - if port == 0 { - port = e.Port - } - if err := superiorPortAllocator.ReleaseRange(port, port); err != nil { - return err - } - e.TargetPort = 0 - e.Port = 0 - requireUpdate = true + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return err + } + + // Release the ports if they are allocated + err = e.releasePorts(superiorPortAllocator) + if err != nil { + return err } + requireUpdate = true + // Delete the Raid bdev and disconnect the replicas if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return err } @@ -555,6 +593,41 @@ func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *co return nil } +func (e *Engine) releasePorts(superiorPortAllocator *commonbitmap.Bitmap) (err error) { + ports := map[int32]struct{}{ + e.Port: {}, + e.TargetPort: {}, + } + + if errRelease := releasePortIfExists(superiorPortAllocator, ports, e.Port); errRelease != nil { + err = multierr.Append(err, errRelease) + } + e.Port = 0 + + if errRelease := releasePortIfExists(superiorPortAllocator, ports, e.TargetPort); errRelease != nil { + err = multierr.Append(err, errRelease) + } + e.TargetPort = 0 + + return err +} + +func releasePortIfExists(superiorPortAllocator *commonbitmap.Bitmap, ports map[int32]struct{}, port int32) error { + if port == 0 { + return nil + } + + _, exists := ports[port] + if exists { + if err := superiorPortAllocator.ReleaseRange(port, port); err != nil { + return err + } + delete(ports, port) + } + + return nil +} + func (e *Engine) Get() (res *spdkrpc.Engine) { e.RLock() defer e.RUnlock() @@ -639,16 +712,8 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { } }() - podIP, err := commonnet.GetIPForPod() - if err != nil { - return err - } - if e.IP != podIP { - // Skip the validation if the engine is being upgraded - if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { - return nil - } - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) + if e.IP != e.TargetIP { + return nil } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -659,6 +724,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { if spdktypes.GetBdevType(bdevRaid) != spdktypes.BdevTypeRaid { return fmt.Errorf("cannot find a raid bdev for engine %v", e.Name) } + bdevRaidSize := bdevRaid.NumBlocks * uint64(bdevRaid.BlockSize) if e.SpecSize != bdevRaidSize { return fmt.Errorf("found mismatching between engine spec size %d and actual raid bdev size %d for engine %s", e.SpecSize, bdevRaidSize, e.Name) @@ -694,6 +760,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { containValidReplica = true } } + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) if !containValidReplica { @@ -834,16 +901,22 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() { } func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.NvmfSubsystem) (err error) { - if e.Frontend != types.FrontendEmpty && e.Frontend != types.FrontendSPDKTCPNvmf && e.Frontend != types.FrontendSPDKTCPBlockdev { + if e.Frontend != types.FrontendEmpty && + e.Frontend != types.FrontendSPDKTCPNvmf && + e.Frontend != types.FrontendSPDKTCPBlockdev { return fmt.Errorf("unknown frontend type %s", e.Frontend) } nqn := helpertypes.GetNQN(e.Name) - subsystem := subsystemMap[nqn] + + subsystem, ok := subsystemMap[nqn] + if !ok { + return fmt.Errorf("cannot find the NVMf subsystem for engine %s", e.Name) + } if e.Frontend == types.FrontendEmpty { if subsystem != nil { - return fmt.Errorf("found nvmf subsystem %s for engine %s with empty frontend", nqn, e.Name) + return fmt.Errorf("found NVMf subsystem %s for engine %s with empty frontend", nqn, e.Name) } if e.Endpoint != "" { return fmt.Errorf("found non-empty endpoint %s for engine %s with empty frontend", e.Endpoint, e.Name) @@ -855,7 +928,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv } if subsystem == nil || len(subsystem.ListenAddresses) == 0 { - return fmt.Errorf("cannot find the Nvmf subsystem for engine %s", e.Name) + return fmt.Errorf("cannot find the NVMf subsystem for engine %s", e.Name) } port := 0 @@ -872,7 +945,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv } } if port == 0 || e.Port != int32(port) { - return fmt.Errorf("cannot find a matching listener with port %d from Nvmf subsystem for engine %s", e.Port, e.Name) + return fmt.Errorf("cannot find a matching listener with port %d from NVMf subsystem for engine %s", e.Port, e.Name) } switch e.Frontend { @@ -880,7 +953,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv if e.initiator == nil { initiator, err := nvme.NewInitiator(e.VolumeName, nqn, nvme.HostProc) if err != nil { - return err + return errors.Wrapf(err, "failed to create initiator for engine %v during frontend validation and update", e.Name) } e.initiator = initiator } @@ -892,7 +965,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv } return err } - if err := e.initiator.LoadEndpoint(e.dmDeviceBusy); err != nil { + if err := e.initiator.LoadEndpoint(e.dmDeviceIsBusy); err != nil { return err } blockDevEndpoint := e.initiator.GetEndpoint() @@ -2037,8 +2110,7 @@ func (e *Engine) Suspend(spdkClient *spdkclient.Client) (err error) { if err != nil { if e.State != types.InstanceStateError { - e.State = types.InstanceStateError - e.log.WithError(err).Info("Failed to suspend engine, will mark the engine as error") + e.log.WithError(err).Warn("Failed to suspend engine") } e.ErrorMsg = err.Error() } else { @@ -2069,8 +2141,7 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { if err != nil { if e.State != types.InstanceStateError { - e.State = types.InstanceStateError - e.log.WithError(err).Info("Failed to resume engine, will mark the engine as error") + e.log.WithError(err).Warn("Failed to resume engine") } e.ErrorMsg = err.Error() } else { @@ -2098,37 +2169,53 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { } // SwitchOverTarget function in the Engine struct is responsible for switching the engine's target to a new address. -func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress string) (err error) { - e.log.Infof("Switching over engine to target address %s", targetAddress) +func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddress string) (err error) { + e.log.Infof("Switching over engine to target address %s", newTargetAddress) + + if newTargetAddress == "" { + return fmt.Errorf("invalid empty target address for engine %s target switchover", e.Name) + } currentTargetAddress := "" + podIP, err := commonnet.GetIPForPod() + if err != nil { + return errors.Wrapf(err, "failed to get IP for pod for engine %s target switchover", e.Name) + } + + newTargetIP, newTargetPort, err := splitHostPort(newTargetAddress) + if err != nil { + return errors.Wrapf(err, "failed to split target address %s for engine %s target switchover", newTargetAddress, e.Name) + } + e.Lock() defer func() { e.Unlock() if err != nil { - e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", targetAddress) + e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", newTargetAddress) - if disconnected, errCheck := e.IsTargetDisconnected(); errCheck != nil { - e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", targetAddress) + if disconnected, errCheck := e.isTargetDisconnected(); errCheck != nil { + e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", newTargetAddress) } else if disconnected { - if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { - e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) - } else { - e.log.Infof("Connected target back to %s", currentTargetAddress) - - if errReload := e.reloadDevice(); errReload != nil { - e.log.WithError(errReload).Warnf("Failed to reload device mapper") + if currentTargetAddress != "" { + if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { + e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) } else { - e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + e.log.Infof("Connected target back to %s", currentTargetAddress) + + if errReload := e.reloadDevice(); errReload != nil { + e.log.WithError(errReload).Warnf("Failed to reload device mapper") + } else { + e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + } } } } } else { e.ErrorMsg = "" - e.log.Infof("Switched over target to %s", targetAddress) + e.log.Infof("Switched over target to %s", newTargetAddress) } e.UpdateCh <- nil @@ -2139,6 +2226,7 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) } + // Check if the engine is suspended before target switchover. suspended, err := initiator.IsSuspended() if err != nil { return errors.Wrapf(err, "failed to check if engine %s is suspended", e.Name) @@ -2147,44 +2235,46 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return fmt.Errorf("engine %s must be suspended before target switchover", e.Name) } + // Load NVMe device info before target switchover. if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s target switchover", e.Name) } } currentTargetAddress = net.JoinHostPort(initiator.TransportAddress, initiator.TransportServiceID) - if e.isSwitchOverTargetRequired(currentTargetAddress, targetAddress) { + if isSwitchOverTargetRequired(currentTargetAddress, newTargetAddress) { if currentTargetAddress != "" { if err := e.disconnectTarget(currentTargetAddress); err != nil { - return err + return errors.Wrapf(err, "failed to disconnect target %s for engine %s", currentTargetAddress, e.Name) } } - if err := e.connectTarget(targetAddress); err != nil { - return err + if err := e.connectTarget(newTargetAddress); err != nil { + return errors.Wrapf(err, "failed to connect target %s for engine %s", newTargetAddress, e.Name) } } // Replace IP and Port with the new target address. - // No need to update TargetIP and TargetPort, because target is not delete yet. - targetIP, targetPort, err := splitHostPort(targetAddress) - if err != nil { - return errors.Wrapf(err, "failed to split target address %s", targetAddress) - } + // No need to update TargetIP, because old target is not delete yet. + e.IP = newTargetIP + e.Port = newTargetPort - e.IP = targetIP - e.Port = targetPort + if newTargetIP == podIP { + e.TargetPort = newTargetPort + } else { + e.TargetPort = 0 + } e.log.Info("Reloading device mapper after target switchover") if err := e.reloadDevice(); err != nil { - return err + return errors.Wrapf(err, "failed to reload device mapper after engine %s target switchover", e.Name) } return nil } -func (e *Engine) IsTargetDisconnected() (bool, error) { +func (e *Engine) isTargetDisconnected() (bool, error) { initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { return false, errors.Wrapf(err, "failed to create initiator for checking engine %s target disconnected", e.Name) @@ -2199,7 +2289,7 @@ func (e *Engine) IsTargetDisconnected() (bool, error) { } if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return false, errors.Wrapf(err, "failed to load NVMe device info for checking engine %s target disconnected", e.Name) } } @@ -2231,7 +2321,7 @@ func (e *Engine) disconnectTarget(targetAddress string) error { } if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s disconnect target %v", e.Name, targetAddress) } } @@ -2267,7 +2357,7 @@ func (e *Engine) connectTarget(targetAddress string) error { } if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { - if !nvme.IsValidNvmeDeviceNotFound(err) { + if !helpertypes.ErrorIsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s connect target %v:%v", e.Name, targetIP, targetPort) } } @@ -2311,30 +2401,32 @@ func (e *Engine) connectTarget(targetAddress string) error { return nil } -// DeleteTarget deletes the target +// DeleteTarget deletes the target instance func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Infof("Deleting target") + e.log.Infof("Deleting target with target port %d", e.TargetPort) - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to stop expose bdev after engine %s target switchover", e.Name) + err = spdkClient.StopExposeBdev(e.Nqn) + if err != nil { + return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name) } - if e.TargetPort != 0 { - if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { - return err - } - e.TargetPort = 0 + err = e.releaseTargetPort(superiorPortAllocator) + if err != nil { + return errors.Wrapf(err, "failed to release target port while deleting target instance for engine %s", e.Name) } - e.log.Infof("Deleting raid bdev %s before target switchover", e.Name) - if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { - return errors.Wrapf(err, "failed to delete raid bdev after engine %s target switchover", e.Name) + e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name) + _, err = spdkClient.BdevRaidDelete(e.Name) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return errors.Wrapf(err, "failed to delete raid bdev after engine %s while deleting target instance", e.Name) } for replicaName, replicaStatus := range e.ReplicaStatusMap { - e.log.Infof("Disconnecting replica %s after target switchover", replicaName) - if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { - e.log.WithError(err).Warnf("Engine failed to disconnect replica %s after target switchover, will mark the replica mode from %v to ERR", replicaName, replicaStatus.Mode) + e.log.Infof("Disconnecting replica %s while deleting target instance", replicaName) + err = disconnectNVMfBdev(spdkClient, replicaStatus.BdevName) + if err != nil { + e.log.WithError(err).Warnf("Engine failed to disconnect replica %s while deleting target instance, will mark the replica mode from %v to ERR", + replicaName, replicaStatus.Mode) replicaStatus.Mode = types.ModeERR } replicaStatus.BdevName = "" @@ -2342,14 +2434,20 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat return nil } -func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { +func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { return oldTargetAddress != newTargetAddress } -func engineOnlyContainsInitiator(e *Engine) bool { - return e.Port != 0 && e.TargetPort == 0 -} +func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) error { + releaseTargetPortRequired := e.TargetPort != 0 + + // Release the target port + if releaseTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { + return errors.Wrapf(err, "failed to release target port %d", e.TargetPort) + } + } + e.TargetPort = 0 -func engineOnlyContainsTarget(e *Engine) bool { - return e.Port == 0 && e.TargetPort != 0 + return nil } diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 5075ed96..8477aa57 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -418,7 +418,7 @@ func compareSvcLvols(prev, cur *Lvol, checkChildren, checkActualSize bool) error func getExposedPort(subsystem *spdktypes.NvmfSubsystem) (exposedPort int32, err error) { if subsystem == nil || len(subsystem.ListenAddresses) == 0 { - return 0, fmt.Errorf("cannot find the Nvmf subsystem") + return 0, fmt.Errorf("cannot find the NVMf subsystem") } port := 0 @@ -434,7 +434,7 @@ func getExposedPort(subsystem *spdktypes.NvmfSubsystem) (exposedPort int32, err return int32(port), nil } - return 0, fmt.Errorf("cannot find a exposed port in the Nvmf subsystem") + return 0, fmt.Errorf("cannot find a exposed port in the NVMf subsystem") } func (r *Replica) validateReplicaHead(headBdevLvol *spdktypes.BdevInfo) (err error) { @@ -518,7 +518,7 @@ func (r *Replica) prepareHead(spdkClient *spdkclient.Client) (err error) { } if !isHeadAvailable { - r.log.Info("Creating a lvol bdev as replica Head") + r.log.Info("Creating a lvol bdev as replica head") if r.ActiveChain[len(r.ActiveChain)-1] != nil { // The replica has a backing image or somehow there are already snapshots in the chain if _, err := spdkClient.BdevLvolClone(r.ActiveChain[len(r.ActiveChain)-1].UUID, r.Name); err != nil { return err @@ -801,7 +801,7 @@ func (r *Replica) Delete(spdkClient *spdkclient.Client, cleanupRequired bool, su } if r.IsExposed { - r.log.Info("Unexposing lvol bdev for replica deletion") + r.log.Info("Unexposing bdev for replica deletion") if err := spdkClient.StopExposeBdev(helpertypes.GetNQN(r.Name)); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return err } diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index f079257f..b64d1db3 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -878,7 +878,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 1123fc6f..cc6e243c 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine without the frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine with the previous frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1375,77 +1375,3 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { err = spdkCli.ReplicaDelete(replicaName2, false) c.Assert(err, IsNil) } - -func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { - fmt.Println("Testing SPDK Engine Creation with Upgrade Required") - - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - volumeName := "test-vol" - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) - c.Assert(err, IsNil) - - targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) - c.Assert(err, IsNil) - c.Assert(engine.Endpoint, Not(Equals), "") - // Initiator is not created, so the IP and Port should be empty - c.Assert(engine.IP, Equals, ip) - c.Assert(engine.Port, Not(Equals), int32(0)) - // Target is created and exposed - c.Assert(engine.TargetIP, Equals, ip) - c.Assert(engine.TargetPort, Not(Equals), int32(0)) - c.Assert(engine.Port, Equals, engine.TargetPort) - - // Tear down engine and replicas - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - - err = spdkCli.ReplicaDelete(replicaName1, false) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName2, false) - c.Assert(err, IsNil) -}