diff --git a/pkg/api/types.go b/pkg/api/types.go index cb30a325..1f600307 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -129,6 +129,7 @@ type Engine struct { Port int32 `json:"port"` TargetIP string `json:"target_ip"` TargetPort int32 `json:"target_port"` + StandbyTargetPort int32 `json:"standby_target_port"` ReplicaAddressMap map[string]string `json:"replica_address_map"` ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"` Head *Lvol `json:"head"` @@ -149,6 +150,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine { Port: e.Port, TargetIP: e.TargetIp, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, ReplicaAddressMap: e.ReplicaAddressMap, ReplicaModeMap: map[string]types.Mode{}, Head: ProtoLvolToLvol(e.Head), diff --git a/pkg/client/client.go b/pkg/client/client.go index 110069f8..8662fad1 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -474,7 +474,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,7 +490,6 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui ReplicaAddressMap: replicaAddressMap, Frontend: frontend, PortCount: portCount, - UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, SalvageRequested: salvageRequested, diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 1bc3a4f2..a8e5fdc8 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -36,18 +36,19 @@ import ( type Engine struct { sync.RWMutex - Name string - VolumeName string - SpecSize uint64 - ActualSize uint64 - IP string - Port int32 - TargetIP string - TargetPort int32 - Frontend string - Endpoint string - Nqn string - Nguid string + Name string + VolumeName string + SpecSize uint64 + ActualSize uint64 + IP string + Port int32 // Port that initiator is connecting to + TargetIP string + TargetPort int32 // Port of the target that is used for letting initiator connect to + StandbyTargetPort int32 + Frontend string + Endpoint string + Nqn string + Nguid string ctrlrLossTimeout int fastIOFailTimeoutSec int @@ -113,10 +114,45 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) isNewEngine() bool { + return e.IP == "" && e.TargetIP == "" && e.StandbyTargetPort == 0 +} + +func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { + initiatorCreationRequired, targetCreationRequired := false, false + var err error + + if podIP == initiatorIP && podIP == targetIP { + if e.Port == 0 && e.TargetPort == 0 { + e.log.Info("Creating both initiator and target instances") + initiatorCreationRequired = true + targetCreationRequired = true + } else if e.Port != 0 && e.TargetPort == 0 { + e.log.Info("Creating a target instance") + if e.StandbyTargetPort != 0 { + e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort) + } else { + targetCreationRequired = true + } + } else { + e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort) + } + } else if podIP == initiatorIP { + e.log.Info("Creating an initiator instance") + initiatorCreationRequired = true + } else if podIP == targetIP { + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + err = fmt.Errorf("invalid initiator and target addresses for engine %s creation with initiator address %v and target address %v", e.Name, initiatorIP, targetIP) + } + + return initiatorCreationRequired, targetCreationRequired, err +} + +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, - "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, @@ -128,7 +164,6 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.Lock() defer func() { e.Unlock() - if requireUpdate { e.UpdateCh <- nil } @@ -143,6 +178,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str if err != nil { return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } + targetIP, _, err := splitHostPort(targetAddress) if err != nil { return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) @@ -177,29 +213,30 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str replicaBdevList := []string{} - initiatorCreationRequired := true - if !upgradeRequired { - if e.IP == "" { - if initiatorIP != targetIP { - // For creating target on another node - initiatorCreationRequired = false - e.log.Info("Creating an target engine") - e.TargetIP = podIP - } else { - // For newly creating engine - e.log.Info("Creating an new engine") - e.IP = podIP - e.TargetIP = podIP - } + initiatorCreationRequired, targetCreationRequired, err := e.checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP) + if err != nil { + return nil, err + } + if !initiatorCreationRequired && !targetCreationRequired { + return e.getWithoutLock(), nil + } - e.log = e.log.WithField("ip", e.IP) - } else { - if initiatorIP != targetIP { - return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) - } + if e.isNewEngine() { + if initiatorCreationRequired { + e.IP = initiatorIP + } + e.TargetIP = targetIP + } - // For creating target on attached node - initiatorCreationRequired = false + e.log = e.log.WithFields(logrus.Fields{ + "initiatorIP": e.IP, + "targetIP": e.TargetIP, + }) + + if targetCreationRequired { + _, err := spdkClient.BdevRaidGet(e.Name, 0) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to get raid bdev %v during engine creation", e.Name) } if salvageRequested { @@ -231,18 +268,15 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) + // TODO: improve the log message + e.log.Infof("Connecting all available replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } } else { - // For reconstructing engine after switching over target to another node - initiatorCreationRequired = false - - e.IP = targetIP + e.log.Info("Skipping target creation during engine creation") - // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKServiceAddress := net.JoinHostPort(e.TargetIP, strconv.Itoa(types.SPDKServicePort)) targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err @@ -253,21 +287,17 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - var engineWithTarget *api.Engine - if initiatorIP != targetIP { - engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) - } - } else { - engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + e.log.Info("Fetching replica list from target engine") + targetEngine, err := targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) } for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } - if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + if _, ok := targetEngine.ReplicaAddressMap[replicaName]; !ok { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { @@ -276,12 +306,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaStatusMap[replicaName].BdevName = replicaName } } - e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) - e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) + + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log.Infof("Connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } - e.log.Info("Launching frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { + log := e.log.WithFields(logrus.Fields{ + "initiatorCreationRequired": initiatorCreationRequired, + "targetCreationRequired": targetCreationRequired, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }) + + log.Info("Handling frontend during engine creation") + if err := e.handleFrontend(spdkClient, superiorPortAllocator, portCount, targetAddress, initiatorCreationRequired, targetCreationRequired); err != nil { return nil, err } @@ -366,7 +404,8 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, + initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -376,9 +415,9 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - initiatorIP, _, err := splitHostPort(initiatorAddress) - if err != nil { - return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + standbyTargetCreationRequired := false + if e.Port != 0 && e.TargetPort == 0 { + standbyTargetCreationRequired = true } targetIP, targetPort, err := splitHostPort(targetAddress) @@ -387,34 +426,87 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, } e.Nqn = helpertypes.GetNQN(e.Name) + e.Nguid = commonutils.RandomID(nvmeNguidLength) + + dmDeviceBusy := false + port := int32(0) + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) + } - var port int32 - if !upgradeRequired { - e.Nguid = commonutils.RandomID(nvmeNguidLength) + defer func() { + if err == nil { + if !standbyTargetCreationRequired { + e.initiator = initiator + e.dmDeviceBusy = dmDeviceBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + } - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + e.log.Infof("Finished handling frontend for engine: %+v", e) } + }() - port, _, err = superiorPortAllocator.AllocateRange(portCount) - if err != nil { - return err + if initiatorCreationRequired && !targetCreationRequired { + initiator.TransportAddress = targetIP + initiator.TransportServiceID = strconv.Itoa(int(targetPort)) + + e.log.Infof("Target instance already exists on %v, no need to create target instance", targetAddress) + e.Port = targetPort + + // TODO: + // "nvme list -o json" might be empty devices for a while instance manager pod is just started. + // The root cause is not clear, so we need to retry to load NVMe device info. + for r := 0; r < maxNumRetries; r++ { + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil && strings.Contains(err.Error(), "failed to get devices") { + time.Sleep(retryInterval) + continue + } + if err == nil { + e.log.Infof("Loaded NVMe device info for engine") + break + } + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) } - e.log.Infof("Allocated port %v", port) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { - return err + err = initiator.LoadEndpoint(false) + if err != nil { + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) } - if initiatorCreationRequired { - e.Port = port - e.TargetPort = port + return nil + } + + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return errors.Wrapf(err, "failed to allocate port for engine %v", e.Name) + } + e.log.Infof("Allocated port %v for engine", port) + + if initiatorCreationRequired { + e.Port = port + } + if targetCreationRequired { + if standbyTargetCreationRequired { + e.StandbyTargetPort = port } else { e.TargetPort = port } - } else { - e.Port = targetPort + } + + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err } if e.Frontend == types.FrontendSPDKTCPNvmf { @@ -422,50 +514,17 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - if initiatorIP != targetIP && !upgradeRequired { - e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) + if !initiatorCreationRequired && targetCreationRequired { + e.log.Infof("Only creating target instance for engine, no need to start initiator") return nil } - initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) - if err != nil { - return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) - } + e.log.Info("Starting initiator for engine") - dmDeviceBusy := false - if initiatorCreationRequired { - e.log.Info("Starting initiator for engine") - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - e.log.Info("Loading NVMe device info for engine") - err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) - if err != nil { - if nvme.IsValidNvmeDeviceNotFound(err) { - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) - } - } - err = initiator.LoadEndpoint(false) - if err != nil { - return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) - } - //dmDeviceBusy = true + dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) + if err != nil { + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) } - e.initiator = initiator - e.dmDeviceBusy = dmDeviceBusy - e.Endpoint = initiator.GetEndpoint() - - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": port, - }) return nil } @@ -573,6 +632,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { Port: e.Port, TargetIp: e.TargetIP, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, Snapshots: map[string]*spdkrpc.Lvol{}, Frontend: e.Frontend, Endpoint: e.Endpoint, @@ -639,16 +699,8 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { } }() - podIP, err := commonnet.GetIPForPod() - if err != nil { - return err - } - if e.IP != podIP { - // Skip the validation if the engine is being upgraded - if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { - return nil - } - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) + if e.IP != e.TargetIP { + return nil } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -2096,37 +2148,53 @@ func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { } // SwitchOverTarget function in the Engine struct is responsible for switching the engine's target to a new address. -func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress string) (err error) { - e.log.Infof("Switching over engine to target address %s", targetAddress) +func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddress string) (err error) { + e.log.Infof("Switching over engine to target address %s", newTargetAddress) + + if newTargetAddress == "" { + return fmt.Errorf("invalid empty target address for engine %s target switchover", e.Name) + } currentTargetAddress := "" + podIP, err := commonnet.GetIPForPod() + if err != nil { + return errors.Wrapf(err, "failed to get IP for pod for engine %s target switchover", e.Name) + } + + newTargetIP, newTargetPort, err := splitHostPort(newTargetAddress) + if err != nil { + return errors.Wrapf(err, "failed to split target address %s for engine %s target switchover", newTargetAddress, e.Name) + } + e.Lock() defer func() { e.Unlock() if err != nil { - e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", targetAddress) + e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", newTargetAddress) - if disconnected, errCheck := e.IsTargetDisconnected(); errCheck != nil { - e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", targetAddress) + if disconnected, errCheck := e.isTargetDisconnected(); errCheck != nil { + e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", newTargetAddress) } else if disconnected { - if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { - e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) - } else { - e.log.Infof("Connected target back to %s", currentTargetAddress) - - if errReload := e.reloadDevice(); errReload != nil { - e.log.WithError(errReload).Warnf("Failed to reload device mapper") + if currentTargetAddress != "" { + if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { + e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) } else { - e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + e.log.Infof("Connected target back to %s", currentTargetAddress) + + if errReload := e.reloadDevice(); errReload != nil { + e.log.WithError(errReload).Warnf("Failed to reload device mapper") + } else { + e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + } } } } } else { e.ErrorMsg = "" - e.log.Infof("Switched over target to %s", targetAddress) + e.log.Infof("Switched over target to %s", newTargetAddress) } e.UpdateCh <- nil @@ -2137,6 +2205,7 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) } + // Check if the engine is suspended before target switchover. suspended, err := initiator.IsSuspended() if err != nil { return errors.Wrapf(err, "failed to check if engine %s is suspended", e.Name) @@ -2145,6 +2214,7 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s return fmt.Errorf("engine %s must be suspended before target switchover", e.Name) } + // Load NVMe device info before target switchover. if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { if !nvme.IsValidNvmeDeviceNotFound(err) { return errors.Wrapf(err, "failed to load NVMe device info for engine %s target switchover", e.Name) @@ -2152,37 +2222,40 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s } currentTargetAddress = net.JoinHostPort(initiator.TransportAddress, initiator.TransportServiceID) - if e.isSwitchOverTargetRequired(currentTargetAddress, targetAddress) { + if isSwitchOverTargetRequired(currentTargetAddress, newTargetAddress) { if currentTargetAddress != "" { if err := e.disconnectTarget(currentTargetAddress); err != nil { - return err + return errors.Wrapf(err, "failed to disconnect target %s for engine %s", currentTargetAddress, e.Name) } } - if err := e.connectTarget(targetAddress); err != nil { - return err + if err := e.connectTarget(newTargetAddress); err != nil { + return errors.Wrapf(err, "failed to connect target %s for engine %s", newTargetAddress, e.Name) } } // Replace IP and Port with the new target address. - // No need to update TargetIP and TargetPort, because target is not delete yet. - targetIP, targetPort, err := splitHostPort(targetAddress) - if err != nil { - return errors.Wrapf(err, "failed to split target address %s", targetAddress) - } + // No need to update TargetIP, because old target is not delete yet. + e.IP = newTargetIP + e.Port = newTargetPort - e.IP = targetIP - e.Port = targetPort + if newTargetIP == podIP { + e.TargetPort = newTargetPort + e.StandbyTargetPort = 0 + } else { + e.StandbyTargetPort = e.TargetPort + e.TargetPort = 0 + } e.log.Info("Reloading device mapper after target switchover") if err := e.reloadDevice(); err != nil { - return err + return errors.Wrapf(err, "failed to reload device mapper after engine %s target switchover", e.Name) } return nil } -func (e *Engine) IsTargetDisconnected() (bool, error) { +func (e *Engine) isTargetDisconnected() (bool, error) { initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { return false, errors.Wrapf(err, "failed to create initiator for checking engine %s target disconnected", e.Name) @@ -2309,30 +2382,32 @@ func (e *Engine) connectTarget(targetAddress string) error { return nil } -// DeleteTarget deletes the target +// DeleteTarget deletes the target instance func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Infof("Deleting target") + e.log.Infof("Deleting target with target port %d and standby target port %d", e.TargetPort, e.StandbyTargetPort) - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to stop expose bdev after engine %s target switchover", e.Name) + err = spdkClient.StopExposeBdev(e.Nqn) + if err != nil { + return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name) } - if e.TargetPort != 0 { - if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { - return err - } - e.TargetPort = 0 + err = e.releaseTargetAndStandbyTargetPorts(superiorPortAllocator) + if err != nil { + return errors.Wrapf(err, "failed to release target and standby target ports while deleting target instance for engine %s", e.Name) } - e.log.Infof("Deleting raid bdev %s before target switchover", e.Name) - if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { - return errors.Wrapf(err, "failed to delete raid bdev after engine %s target switchover", e.Name) + e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name) + _, err = spdkClient.BdevRaidDelete(e.Name) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return errors.Wrapf(err, "failed to delete raid bdev after engine %s while deleting target instance", e.Name) } for replicaName, replicaStatus := range e.ReplicaStatusMap { - e.log.Infof("Disconnecting replica %s after target switchover", replicaName) - if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { - e.log.WithError(err).Warnf("Engine failed to disconnect replica %s after target switchover, will mark the replica mode from %v to ERR", replicaName, replicaStatus.Mode) + e.log.Infof("Disconnecting replica %s while deleting target instance", replicaName) + err = disconnectNVMfBdev(spdkClient, replicaStatus.BdevName) + if err != nil { + e.log.WithError(err).Warnf("Engine failed to disconnect replica %s while deleting target instance, will mark the replica mode from %v to ERR", + replicaName, replicaStatus.Mode) replicaStatus.Mode = types.ModeERR } replicaStatus.BdevName = "" @@ -2340,14 +2415,29 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat return nil } -func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { +func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { return oldTargetAddress != newTargetAddress } -func engineOnlyContainsInitiator(e *Engine) bool { - return e.Port != 0 && e.TargetPort == 0 -} +func (e *Engine) releaseTargetAndStandbyTargetPorts(superiorPortAllocator *commonbitmap.Bitmap) error { + releaseTargetPortRequired := e.TargetPort != 0 + releaseStandbyTargetPortRequired := e.StandbyTargetPort != 0 && e.StandbyTargetPort != e.TargetPort + + // Release the target port + if releaseTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { + return errors.Wrapf(err, "failed to release target port %d", e.TargetPort) + } + } + e.TargetPort = 0 -func engineOnlyContainsTarget(e *Engine) bool { - return e.Port == 0 && e.TargetPort != 0 + // Release the standby target port + if releaseStandbyTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.StandbyTargetPort, e.StandbyTargetPort); err != nil { + return errors.Wrapf(err, "failed to release standby target port %d", e.StandbyTargetPort) + } + } + e.StandbyTargetPort = 0 + + return nil } diff --git a/pkg/spdk/engine_test.go b/pkg/spdk/engine_test.go new file mode 100644 index 00000000..4780430f --- /dev/null +++ b/pkg/spdk/engine_test.go @@ -0,0 +1,233 @@ +package spdk + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + commonbitmap "github.com/longhorn/go-common-libs/bitmap" + + . "gopkg.in/check.v1" +) + +func (s *TestSuite) TestCheckInitiatorAndTargetCreationRequirements(c *C) { + testCases := []struct { + name string + podIP string + initiatorIP string + targetIP string + port int32 + targetPort int32 + standbyTargetPort int32 + expectedInitiatorCreationRequired bool + expectedTargetCreationRequired bool + expectedError error + }{ + { + name: "Create both initiator and target instances", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local target instance on the node with initiator instance", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 8080, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local initiator instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + { + name: "Create local target instance on the node without initiator instance", + podIP: "192.168.1.2", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Invalid initiator and target addresses", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.2", + targetIP: "192.168.1.3", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: fmt.Errorf("invalid initiator and target addresses for engine test-engine creation with initiator address 192.168.1.2 and target address 192.168.1.3"), + }, + { + name: "Standby target instance is already created", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 100, + targetPort: 0, + standbyTargetPort: 105, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + } + for testName, testCase := range testCases { + c.Logf("testing checkInitiatorAndTargetCreationRequirements.%v", testName) + + engine := &Engine{ + Port: testCase.port, + TargetPort: testCase.targetPort, + StandbyTargetPort: testCase.standbyTargetPort, + Name: "test-engine", + log: logrus.New(), + } + + initiatorCreationRequired, targetCreationRequired, err := engine.checkInitiatorAndTargetCreationRequirements(testCase.podIP, testCase.initiatorIP, testCase.targetIP) + + c.Assert(initiatorCreationRequired, Equals, testCase.expectedInitiatorCreationRequired, + Commentf("Test case '%s': unexpected initiator creation requirement", testCase.name)) + c.Assert(targetCreationRequired, Equals, testCase.expectedTargetCreationRequired, + Commentf("Test case '%s': unexpected target creation requirement", testCase.name)) + c.Assert(err, DeepEquals, testCase.expectedError, + Commentf("Test case '%s': unexpected error result", testCase.name)) + } +} + +func (s *TestSuite) TestIsNewEngine(c *C) { + testCases := []struct { + name string + engine *Engine + expected bool + }{ + { + name: "New engine with empty IP and TargetIP and StandbyTargetPort 0", + engine: &Engine{ + IP: "", + TargetIP: "", + StandbyTargetPort: 0, + }, + expected: true, + }, + { + name: "Engine with non-empty IP", + engine: &Engine{ + IP: "192.168.1.1", + TargetIP: "", + StandbyTargetPort: 0, + }, + expected: false, + }, + { + name: "Engine with non-empty TargetIP", + engine: &Engine{ + IP: "", + TargetIP: "192.168.1.2", + StandbyTargetPort: 0, + }, + expected: false, + }, + { + name: "Engine with non-zero StandbyTargetPort", + engine: &Engine{ + IP: "", + TargetIP: "", + StandbyTargetPort: 8080, + }, + expected: false, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing isNewEngine.%v", testName) + result := testCase.engine.isNewEngine() + c.Assert(result, Equals, testCase.expected, Commentf("Test case '%s': unexpected result", testCase.name)) + } +} + +func (s *TestSuite) TestReleaseTargetAndStandbyTargetPorts(c *C) { + testCases := []struct { + name string + engine *Engine + expectedTargetPort int32 + expectedStandbyTargetPort int32 + expectedError error + }{ + { + name: "Release both target and standby target ports", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 2005, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release target port only but standby target port is not set", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 0, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release target and standby ports when they are the same", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 2000, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release snapshot target port only", + engine: &Engine{ + TargetPort: 0, + StandbyTargetPort: 2000, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing releaseTargetAndStandbyTargetPorts.%v", testName) + + bitmap, err := commonbitmap.NewBitmap(0, 100000) + c.Assert(err, IsNil) + + err = testCase.engine.releaseTargetAndStandbyTargetPorts(bitmap) + c.Assert(err, DeepEquals, testCase.expectedError, Commentf("Test case '%s': unexpected error result", testCase.name)) + c.Assert(testCase.engine.TargetPort, Equals, testCase.expectedTargetPort, Commentf("Test case '%s': unexpected target port", testCase.name)) + c.Assert(testCase.engine.StandbyTargetPort, Equals, testCase.expectedStandbyTargetPort, Commentf("Test case '%s': unexpected standby target port", testCase.name)) + } +} diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index f079257f..b64d1db3 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -878,7 +878,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 1123fc6f..cc6e243c 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine without the frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine with the previous frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1375,77 +1375,3 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { err = spdkCli.ReplicaDelete(replicaName2, false) c.Assert(err, IsNil) } - -func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { - fmt.Println("Testing SPDK Engine Creation with Upgrade Required") - - diskDriverName := "aio" - - ip, err := commonnet.GetAnyExternalIP() - c.Assert(err, IsNil) - os.Setenv(commonnet.EnvPodIP, ip) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ne, err := helperutil.NewExecutor(commontypes.ProcDirectory) - c.Assert(err, IsNil) - LaunchTestSPDKGRPCServer(ctx, c, ip, ne.Execute) - - loopDevicePath := PrepareDiskFile(c) - defer func() { - CleanupDiskFile(c, loopDevicePath) - }() - - spdkCli, err := client.NewSPDKClient(net.JoinHostPort(ip, strconv.Itoa(types.SPDKServicePort))) - c.Assert(err, IsNil) - - disk, err := spdkCli.DiskCreate(defaultTestDiskName, "", loopDevicePath, diskDriverName, int64(defaultTestBlockSize)) - c.Assert(err, IsNil) - c.Assert(disk.Path, Equals, loopDevicePath) - c.Assert(disk.Uuid, Not(Equals), "") - - defer func() { - err := spdkCli.DiskDelete(defaultTestDiskName, disk.Uuid, disk.Path, diskDriverName) - c.Assert(err, IsNil) - }() - - volumeName := "test-vol" - engineName := fmt.Sprintf("%s-engine", volumeName) - replicaName1 := fmt.Sprintf("%s-replica-1", volumeName) - replicaName2 := fmt.Sprintf("%s-replica-2", volumeName) - - replica1, err := spdkCli.ReplicaCreate(replicaName1, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - replica2, err := spdkCli.ReplicaCreate(replicaName2, defaultTestDiskName, disk.Uuid, defaultTestLvolSize, defaultTestReplicaPortCount) - c.Assert(err, IsNil) - - replicaAddressMap := map[string]string{ - replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), - replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), - } - - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) - c.Assert(err, IsNil) - - targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) - c.Assert(err, IsNil) - c.Assert(engine.Endpoint, Not(Equals), "") - // Initiator is not created, so the IP and Port should be empty - c.Assert(engine.IP, Equals, ip) - c.Assert(engine.Port, Not(Equals), int32(0)) - // Target is created and exposed - c.Assert(engine.TargetIP, Equals, ip) - c.Assert(engine.TargetPort, Not(Equals), int32(0)) - c.Assert(engine.Port, Equals, engine.TargetPort) - - // Tear down engine and replicas - err = spdkCli.EngineDelete(engineName) - c.Assert(err, IsNil) - - err = spdkCli.ReplicaDelete(replicaName1, false) - c.Assert(err, IsNil) - err = spdkCli.ReplicaDelete(replicaName2, false) - c.Assert(err, IsNil) -}