diff --git a/pkg/client/client.go b/pkg/client/client.go index 0c3f2bcc..b67f20ea 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, + SalvageRequested: salvageRequested, }) if err != nil { return nil, errors.Wrap(err, "failed to start SPDK engine") diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 2e2f5c26..6c79c7ff 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -104,13 +104,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, + "salvageRequested": salvageRequested, }).Info("Creating engine") requireUpdate := true @@ -192,10 +193,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str initiatorCreationRequired = false } + if salvageRequested { + e.log.Info("Requesting salvage for engine replicas") + + replicaAddressMap, err = e.filterSalvageCandidates(replicaAddressMap) + if err != nil { + return nil, errors.Wrapf(err, "failed to update replica mode to filter salvage candidates") + } + } + for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } + bdevName, err := connectNVMfBdev(spdkClient, replicaName, replicaAddr) if err != nil { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) @@ -272,6 +283,80 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str return e.getWithoutLock(), nil } +// filterSalvageCandidates updates the replicaAddressMap by retaining only replicas +// eligible for salvage based on the largest volume head size. +// +// It iterates through all replicas and: +// - Retrieves the volume head size for each replica. +// - Identifies replicas with the largest volume head size as salvage candidates. +// - Remove the replicas that are not eligible as salvage candidates. +func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (map[string]string, error) { + // Initialize filteredCandidates to hold a copy of replicaAddressMap. + filteredCandidates := map[string]string{} + for key, value := range replicaAddressMap { + filteredCandidates[key] = value + } + + volumeHeadSizeToReplicaNames := map[uint64][]string{} + + // Collect volume head size for each replica. + for replicaName, replicaAddress := range replicaAddressMap { + func() { + // Get service client for the current replica. + replicaServiceCli, err := GetServiceClient(replicaAddress) + if err != nil { + e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress) + return + } + + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress) + } + }() + + // Retrieve replica information. + replica, err := replicaServiceCli.ReplicaGet(replicaName) + if err != nil { + e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica info", replicaName, replicaAddress) + delete(filteredCandidates, replicaName) + return + } + + // Map volume head size to replica names. + volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName) + }() + } + + // Sort the volume head sizes to find the largest. + volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames) + if err != nil { + return nil, errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size") + } + + if len(volumeHeadSizeSorted) == 0 { + return nil, errors.New("failed to find any salvage candidate with volume head size") + } + + // Determine salvage candidates with the largest volume head size. + largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1] + e.log.Infof("Selecting salvage candidates with the largest volume head size %v from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames) + + // Filter out replicas that do not match the largest volume head size. + salvageCandidates := volumeHeadSizeToReplicaNames[largestVolumeHeadSize] + for replicaName := range replicaAddressMap { + if !commonutils.Contains(salvageCandidates, replicaName) { + e.log.Infof("Skipping salvage for replica %s with address %s due to not having the largest volume head size (%v)", replicaName, replicaAddressMap[replicaName]) + delete(filteredCandidates, replicaName) + continue + } + + e.log.Infof("Including replica %s as a salvage candidate", replicaName) + } + + return filteredCandidates, nil +} + func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) @@ -1812,12 +1897,12 @@ func (e *Engine) waitForRestoreComplete() error { var err error for range periodicChecker.C { isReplicaRestoreCompleted := true - for replicaName, replicaAddress := range e.ReplicaAddressMap { - if e.ReplicaModeMap[replicaName] != types.ModeRW { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW { continue } - isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaAddress) + isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaStatus.Address) if err != nil { return errors.Wrapf(err, "failed to check replica %s restore status", replicaName) } diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index e76e937a..f86bf995 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -612,6 +612,11 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio } headSvcLvol := r.ActiveChain[r.ChainLength-1] + if headSvcLvol.UUID == "" && r.State == types.InstanceStateStopped { + r.log.Debugf("Updating replica %s state from %v to %v because headSvcLvol.UUID is empty", r.Name, r.State, types.InstanceStatePending) + r.State = types.InstanceStatePending + } + // Create bdev lvol if the replica is the new one if r.State == types.InstanceStatePending { var lvsList []spdktypes.LvstoreInfo @@ -636,17 +641,46 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio return nil, fmt.Errorf("found mismatching between the actual lvstore name %s with UUID %s and the recorded lvstore name %s with UUID %s during replica %s creation", lvsList[0].Name, lvsList[0].UUID, r.LvsName, r.LvsUUID, r.Name) } - r.log.Info("Creating a lvol bdev for the new replica") - if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil { - return nil, err - } bdevLvolList, err := spdkClient.BdevLvolGet(r.Alias, 0) - if err != nil { - return nil, err + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to check existence of lvol bdev for the new replica %v", r.Name) + } + + if len(bdevLvolList) == 0 { + r.log.Info("Creating a lvol bdev for the new replica") + if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil { + return nil, err + } + bdevLvolList, err = spdkClient.BdevLvolGet(r.Alias, 0) + if err != nil { + return nil, err + } + } else { + r.log.Infof("Skipping creating a lvol bdev %v during replica creation because it already exists", r.Alias) + + replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { + var lvolName string + if len(bdev.Aliases) == 1 { + lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) + } + return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) + } + bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) + if err != nil { + return nil, err + } + + r.log.Infof("Constructing replica %v object during replica creation", r.Name) + err = r.construct(bdevLvolMap) + if err != nil { + return nil, err + } } + if len(bdevLvolList) < 1 { return nil, fmt.Errorf("cannot find lvol %v after creation", r.Alias) } + headSvcLvol.UUID = bdevLvolList[0].UUID headSvcLvol.CreationTime = bdevLvolList[0].CreationTime headSvcLvol.ActualSize = bdevLvolList[0].DriverSpecific.Lvol.NumAllocatedClusters * defaultClusterSize @@ -670,9 +704,17 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio } r.portAllocator = bitmap + nqn := helpertypes.GetNQN(r.Name) + + // Blindly stop exposing the bdev if it exists. This is to avoid potential inconsistencies during salvage case. + if err := spdkClient.StopExposeBdev(nqn); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to stop expose replica %v", r.Name) + } + nguid := commonutils.RandomID(nvmeNguidLength) - if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil { - return nil, err + + if err := spdkClient.StartExposeBdev(nqn, headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil { + return nil, errors.Wrapf(err, "failed to expose replica %v", r.Name) } r.IsExposed = true r.State = types.InstanceStateRunning diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index 7d0b2658..2e40fec8 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 8ccbafba..1123fc6f 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine without the frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine with the previous frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1425,11 +1425,11 @@ func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) c.Assert(err, IsNil) targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Not(Equals), "") // Initiator is not created, so the IP and Port should be empty