From a8c7230e99c4070a8b6f42558773e99ae76ac296 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Tue, 15 Oct 2024 10:22:03 +0800 Subject: [PATCH 1/7] refactor: replica rebuilding dst cleanup returns error Longhorn 9488 Signed-off-by: Shuo Wu --- pkg/spdk/replica.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index f86bf995..de4c9dca 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -781,7 +781,7 @@ func (r *Replica) Delete(spdkClient *spdkclient.Client, cleanupRequired bool, su // Clean up the rebuilding cached info first r.doCleanupForRebuildingSrc(spdkClient) - r.doCleanupForRebuildingDst(spdkClient, false) + _ = r.doCleanupForRebuildingDst(spdkClient, false) if r.isRebuilding { r.rebuildingDstCache.rebuildingError = "replica is being deleted" r.rebuildingDstCache.rebuildingState = types.ProgressStateError @@ -826,7 +826,7 @@ func (r *Replica) Delete(spdkClient *spdkclient.Client, cleanupRequired bool, su CleanupLvolTree(spdkClient, r.ActiveChain[1].Name, bdevLvolMap, r.log) } // Clean up the possible rebuilding leftovers - r.doCleanupForRebuildingDst(spdkClient, true) + _ = r.doCleanupForRebuildingDst(spdkClient, true) r.log.Info("Deleted replica") @@ -1551,7 +1551,7 @@ func (r *Replica) RebuildingDstFinish(spdkClient *spdkclient.Client) (err error) } } - r.doCleanupForRebuildingDst(spdkClient, r.rebuildingDstCache.rebuildingState == types.ProgressStateError) + _ = r.doCleanupForRebuildingDst(spdkClient, r.rebuildingDstCache.rebuildingState == types.ProgressStateError) replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { var lvolName string @@ -1574,10 +1574,12 @@ func (r *Replica) RebuildingDstFinish(spdkClient *spdkclient.Client) (err error) return nil } -func (r *Replica) doCleanupForRebuildingDst(spdkClient *spdkclient.Client, cleanupRebuildingLvolTree bool) { +func (r *Replica) doCleanupForRebuildingDst(spdkClient *spdkclient.Client, cleanupRebuildingLvolTree bool) error { + aggregatedErrors := []error{} if r.rebuildingDstCache.srcReplicaAddress != "" { if err := disconnectNVMfBdev(spdkClient, r.rebuildingDstCache.externalSnapshotBdevName); err != nil { r.log.WithError(err).Errorf("Failed to disconnect the external src snapshot bdev %s for rebuilding dst cleanup, will continue", r.rebuildingDstCache.externalSnapshotBdevName) + aggregatedErrors = append(aggregatedErrors, err) } else { r.rebuildingDstCache.srcReplicaAddress = "" } @@ -1586,41 +1588,50 @@ func (r *Replica) doCleanupForRebuildingDst(spdkClient *spdkclient.Client, clean // Blindly clean up the rebuilding lvol and the exposed port rebuildingLvolName := GetReplicaRebuildingLvolName(r.Name) if r.rebuildingDstCache.rebuildingLvol != nil && r.rebuildingDstCache.rebuildingLvol.Name != rebuildingLvolName { - r.log.Errorf("BUG: replica %s rebuilding lvol actual name %s does not match the expected name %v, will use the actual name for the cleanup", r.Name, r.rebuildingDstCache.rebuildingLvol.Name, rebuildingLvolName) + err := fmt.Errorf("BUG: replica %s rebuilding lvol actual name %s does not match the expected name %v, will use the actual name for the cleanup", r.Name, r.rebuildingDstCache.rebuildingLvol.Name, rebuildingLvolName) + r.log.Error(err) + aggregatedErrors = append(aggregatedErrors, err) rebuildingLvolName = r.rebuildingDstCache.rebuildingLvol.Name } if err := spdkClient.StopExposeBdev(helpertypes.GetNQN(rebuildingLvolName)); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { r.log.WithError(err).Errorf("Failed to stop exposing the rebuilding lvol %s for rebuilding dst cleanup, will continue", rebuildingLvolName) + aggregatedErrors = append(aggregatedErrors, err) } if r.rebuildingDstCache.rebuildingPort != 0 { if err := r.portAllocator.ReleaseRange(r.rebuildingDstCache.rebuildingPort, r.rebuildingDstCache.rebuildingPort); err != nil { r.log.WithError(err).Errorf("Failed to release the rebuilding port %d for rebuilding dst cleanup, will continue", r.rebuildingDstCache.rebuildingPort) + aggregatedErrors = append(aggregatedErrors, err) } else { r.rebuildingDstCache.rebuildingPort = 0 } } if _, err := spdkClient.BdevLvolDelete(spdktypes.GetLvolAlias(r.LvsName, rebuildingLvolName)); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { r.log.WithError(err).Errorf("Failed to delete the rebuilding lvol %s for rebuilding dst cleanup, will continue", rebuildingLvolName) + aggregatedErrors = append(aggregatedErrors, err) } else { r.rebuildingDstCache.rebuildingLvol = nil } // Mainly for rebuilding failed case if cleanupRebuildingLvolTree && len(r.rebuildingDstCache.processedSnapshotList) > 0 { - var err error + allLvolsCleaned := true // Do cleanup in a reverse order to avoid trying to delete a snapshot lvol with multiple children for idx := len(r.rebuildingDstCache.processedSnapshotList) - 1; idx >= 0; idx-- { snapLvolAlias := spdktypes.GetLvolAlias(r.LvsName, GetReplicaSnapshotLvolName(r.Name, r.rebuildingDstCache.processedSnapshotList[idx])) - if _, err = spdkClient.BdevLvolDelete(snapLvolAlias); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + if _, err := spdkClient.BdevLvolDelete(snapLvolAlias); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + allLvolsCleaned = false r.log.WithError(err).Errorf("failed to delete rebuilt snapshot lvol %v for rebuilding dst cleanup, will continue", snapLvolAlias) + aggregatedErrors = append(aggregatedErrors, err) } } - if err == nil { + if allLvolsCleaned { r.rebuildingDstCache.processedSnapshotList = make([]string, 0) } } r.rebuildingDstCache.rebuildingSnapshotMap = map[string]*api.Lvol{} + + return util.CombineErrors(aggregatedErrors...) } // RebuildingDstShallowCopyStart let the dst replica ask the src replica to start a shallow copy from a snapshot to the rebuilding lvol. From 686f19128fe0d3d7e5b2c9bc72694b98d2b6a8b6 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Tue, 15 Oct 2024 10:25:01 +0800 Subject: [PATCH 2/7] feat: replica cleans up the previous rebuilding src info before starting a new rebuilding Longhorn 9488 Signed-off-by: Shuo Wu --- pkg/spdk/replica.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index de4c9dca..7a2e7195 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -1408,9 +1408,11 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa return "", fmt.Errorf("invalid chain length %d for dst replica %v rebuilding start", r.ChainLength, r.Name) } - // TODO: Need to do cleanup rather than directly error out if we want to reuse a rebuilding failed replica + // Replica.Delete and Replica.Create do not guarantee that the previous rebuilding src replica info is cleaned up if r.rebuildingDstCache.srcReplicaName != "" || r.rebuildingDstCache.srcReplicaAddress != "" || r.rebuildingDstCache.externalSnapshotName != "" || r.rebuildingDstCache.externalSnapshotBdevName != "" { - return "", fmt.Errorf("found non-emtpy src replica %s, src replica address %s, external snapshot name %s, or external snapshot bdev name %s for dst replica rebuilding start, maybe the leftover of the previous rebuilding failure", r.rebuildingDstCache.srcReplicaName, r.rebuildingDstCache.srcReplicaAddress, r.rebuildingDstCache.externalSnapshotName, r.rebuildingDstCache.externalSnapshotBdevName) + if err := r.doCleanupForRebuildingDst(spdkClient, false); err != nil { + return "", fmt.Errorf("failed to clean up the previous src replica info for dst replica rebuilding start, src replica name %s, address %s, external snapshot name %s, or external snapshot bdev name %s", r.rebuildingDstCache.srcReplicaName, r.rebuildingDstCache.srcReplicaAddress, r.rebuildingDstCache.externalSnapshotName, r.rebuildingDstCache.externalSnapshotBdevName) + } } r.rebuildingDstCache.srcReplicaName = srcReplicaName r.rebuildingDstCache.srcReplicaAddress = srcReplicaAddress From 4f4515f81a35683aa471d9c99e2452ba8fe803b5 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Fri, 18 Oct 2024 10:32:57 +0800 Subject: [PATCH 3/7] refactor: replica caches Head Signed-off-by: Shuo Wu --- pkg/spdk/replica.go | 269 ++++++++++++++++++++++++-------------------- pkg/spdk/server.go | 2 +- pkg/spdk/types.go | 3 + 3 files changed, 154 insertions(+), 120 deletions(-) diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 7a2e7195..4eeee46a 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -41,12 +41,16 @@ type Replica struct { ctx context.Context + // Head should be the only writable lvol in the regular Replica lvol chain/map. + // And it is the last entry of ActiveChain if it is not nil. + Head *Lvol // ActiveChain stores the backing image info in index 0. // If a replica does not contain a backing image, the first entry will be nil. - // The last entry of the chain is always the head lvol. + // The last entry of the chain should be the head lvol if it exists. ActiveChain []*Lvol // ChainLength typically has length no less than 2. // Since the first and last entries of ActiveChain are the backing image and the head, respectively. + // If the head does not exist, the last entry of ActiveChain will be a snapshot or the backing image. ChainLength int // SnapshotLvolMap map[]. consists of `-snap-` SnapshotLvolMap map[string]*Lvol @@ -131,7 +135,7 @@ func ServiceReplicaToProtoReplica(r *Replica) *spdkrpc.Replica { ErrorMsg: r.ErrorMsg, } - res.Head = ServiceLvolToProtoLvol(r.Name, r.ActiveChain[r.ChainLength-1]) + res.Head = ServiceLvolToProtoLvol(r.Name, r.Head) // spdkrpc.Replica.Snapshots is map[] rather than map[] for lvolName, lvol := range r.SnapshotLvolMap { res.Snapshots[GetSnapshotNameFromReplicaSnapshotLvolName(r.Name, lvolName)] = ServiceLvolToProtoLvol(r.Name, lvol) @@ -156,15 +160,9 @@ func NewReplica(ctx context.Context, replicaName, lvsName, lvsUUID string, specS return &Replica{ ctx: ctx, + Head: nil, ActiveChain: []*Lvol{ nil, - { - Name: replicaName, - Alias: spdktypes.GetLvolAlias(lvsName, replicaName), - SpecSize: roundedSpecSize, - ActualSize: actualSize, - Children: map[string]*Lvol{}, - }, }, ChainLength: 2, SnapshotLvolMap: map[string]*Lvol{}, @@ -189,16 +187,6 @@ func NewReplica(ctx context.Context, replicaName, lvsName, lvsUUID string, specS } } -func (r *Replica) GetVolumeHead() *Lvol { - r.RLock() - defer r.RUnlock() - - if r.ChainLength < 2 { - return nil - } - return r.ActiveChain[r.ChainLength-1] -} - func (r *Replica) Sync(spdkClient *spdkclient.Client) (err error) { r.Lock() defer r.Unlock() @@ -255,7 +243,7 @@ func (r *Replica) construct(bdevLvolMap map[string]*spdktypes.BdevInfo) (err err return fmt.Errorf("invalid state %s with rebuilding %v for replica %s construct", r.State, r.isRebuilding, r.Name) } - if err := r.validateReplicaInfo(bdevLvolMap[r.Name]); err != nil { + if err := r.validateReplicaHead(bdevLvolMap[r.Name]); err != nil { return err } @@ -268,9 +256,11 @@ func (r *Replica) construct(bdevLvolMap map[string]*spdktypes.BdevInfo) (err err return err } + r.Head = newChain[len(newChain)-1] r.ActiveChain = newChain r.ChainLength = len(r.ActiveChain) r.SnapshotLvolMap = newSnapshotLvolMap + if r.State == types.InstanceStatePending { r.State = types.InstanceStateStopped } @@ -303,7 +293,7 @@ func (r *Replica) validateAndUpdate(bdevLvolMap map[string]*spdktypes.BdevInfo, return nil } - if err := r.validateReplicaInfo(bdevLvolMap[r.Name]); err != nil { + if err := r.validateReplicaHead(bdevLvolMap[r.Name]); err != nil { return err } @@ -444,7 +434,7 @@ func getExposedPort(subsystem *spdktypes.NvmfSubsystem) (exposedPort int32, err return 0, fmt.Errorf("cannot find a exposed port in the Nvmf subsystem") } -func (r *Replica) validateReplicaInfo(headBdevLvol *spdktypes.BdevInfo) (err error) { +func (r *Replica) validateReplicaHead(headBdevLvol *spdktypes.BdevInfo) (err error) { if headBdevLvol == nil { return fmt.Errorf("found nil head bdev lvol for replica %s", r.Name) } @@ -462,6 +452,89 @@ func (r *Replica) validateReplicaInfo(headBdevLvol *spdktypes.BdevInfo) (err err return nil } +func (r *Replica) IsHeadAvailable(spdkClient *spdkclient.Client) (isAvailable bool, err error) { + defer func() { + if err != nil || isAvailable { + return + } + r.Head = nil + if r.ActiveChain[len(r.ActiveChain)-1] != nil && + r.ActiveChain[len(r.ActiveChain)-1].Name == r.Name { + r.ActiveChain = r.ActiveChain[:len(r.ActiveChain)-1] + } + }() + + if len(r.ActiveChain) < 2 { + return false, nil + } + if r.Head == nil { + return false, nil + } + + bdevLvolList, err := spdkClient.BdevLvolGet(r.Alias, 0) + if err != nil { + return false, err + } + if len(bdevLvolList) < 1 { + return false, nil + } + + return true, nil +} + +func (r *Replica) updateHeadCache(spdkClient *spdkclient.Client) (err error) { + bdevLvolList, err := spdkClient.BdevLvolGet(r.Alias, 0) + if err != nil { + return err + } + if len(bdevLvolList) < 1 { + return fmt.Errorf("cannot find head lvol %v for the cache update", r.Alias) + } + + r.Head = BdevLvolInfoToServiceLvol(&bdevLvolList[0]) + + if len(r.ActiveChain) == 1 || (r.ActiveChain[len(r.ActiveChain)-1] != nil && r.ActiveChain[len(r.ActiveChain)-1].Name != r.Name) { + r.ActiveChain = append(r.ActiveChain, r.Head) + } else { + r.ActiveChain[len(r.ActiveChain)-1] = r.Head + } + if r.ActiveChain[len(r.ActiveChain)-2] != nil { + if r.ActiveChain[len(r.ActiveChain)-2].Name != r.Head.Parent { + return fmt.Errorf("found the last entry of the active chain %v is not the head parent %v", r.ActiveChain[len(r.ActiveChain)-2].Name, r.Head.Parent) + } + r.ActiveChain[len(r.ActiveChain)-2].Children[r.Head.Name] = r.Head + } + + return nil +} + +func (r *Replica) prepareHead(spdkClient *spdkclient.Client) (err error) { + isHeadAvailable, err := r.IsHeadAvailable(spdkClient) + if err != nil { + return err + } + + if !isHeadAvailable { + r.log.Info("Creating a lvol bdev as replica Head") + if r.ActiveChain[len(r.ActiveChain)-1] != nil { // The replica has a backing image or somehow there are already snapshots in the chain + if _, err := spdkClient.BdevLvolClone(r.ActiveChain[len(r.ActiveChain)-1].UUID, r.Name); err != nil { + return err + } + if r.ActiveChain[len(r.ActiveChain)-1].SpecSize != r.SpecSize { + if _, err := spdkClient.BdevLvolResize(r.Alias, r.SpecSize); err != nil { + return err + } + } + } else { + if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil { + return err + } + } + } + + return r.updateHeadCache(spdkClient) +} + // getRootLvolName relies on the lvol name to identify if a lvol belongs to the replica, // then figuring out whether it is the root by checking the parent func getRootLvolName(replicaName string, bdevLvolMap map[string]*spdktypes.BdevInfo) (rootLvolName string) { @@ -607,85 +680,40 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio } }() - if r.ChainLength < 2 { - return nil, fmt.Errorf("invalid chain length %d for replica creation", r.ChainLength) - } - headSvcLvol := r.ActiveChain[r.ChainLength-1] - - if headSvcLvol.UUID == "" && r.State == types.InstanceStateStopped { - r.log.Debugf("Updating replica %s state from %v to %v because headSvcLvol.UUID is empty", r.Name, r.State, types.InstanceStatePending) - r.State = types.InstanceStatePending - } - // Create bdev lvol if the replica is the new one if r.State == types.InstanceStatePending { - var lvsList []spdktypes.LvstoreInfo - if r.LvsUUID != "" { - lvsList, err = spdkClient.BdevLvolGetLvstore("", r.LvsUUID) - } else if r.LvsName != "" { - lvsList, err = spdkClient.BdevLvolGetLvstore(r.LvsName, "") - } - if err != nil { - return nil, err - } - if len(lvsList) != 1 { - return nil, fmt.Errorf("found zero or multiple lvstore with name %s and UUID %s during replica %s creation", r.LvsName, r.LvsUUID, r.Name) - } - if r.LvsName == "" { - r.LvsName = lvsList[0].Name - } - if r.LvsUUID == "" { - r.LvsUUID = lvsList[0].UUID - } - if r.LvsName != lvsList[0].Name || r.LvsUUID != lvsList[0].UUID { - return nil, fmt.Errorf("found mismatching between the actual lvstore name %s with UUID %s and the recorded lvstore name %s with UUID %s during replica %s creation", lvsList[0].Name, lvsList[0].UUID, r.LvsName, r.LvsUUID, r.Name) - } - - bdevLvolList, err := spdkClient.BdevLvolGet(r.Alias, 0) - if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { - return nil, errors.Wrapf(err, "failed to check existence of lvol bdev for the new replica %v", r.Name) - } - - if len(bdevLvolList) == 0 { - r.log.Info("Creating a lvol bdev for the new replica") - if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil { - return nil, err - } - bdevLvolList, err = spdkClient.BdevLvolGet(r.Alias, 0) - if err != nil { - return nil, err - } - } else { - r.log.Infof("Skipping creating a lvol bdev %v during replica creation because it already exists", r.Alias) - - replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { - var lvolName string - if len(bdev.Aliases) == 1 { - lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) - } - return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) - } - bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) - if err != nil { - return nil, err - } - - r.log.Infof("Constructing replica %v object during replica creation", r.Name) - err = r.construct(bdevLvolMap) - if err != nil { - return nil, err - } + if r.ChainLength != 1 { + return nil, fmt.Errorf("invalid chain length %d for new replica creation", r.ChainLength) } + } - if len(bdevLvolList) < 1 { - return nil, fmt.Errorf("cannot find lvol %v after creation", r.Alias) - } + var lvsList []spdktypes.LvstoreInfo + if r.LvsUUID != "" { + lvsList, err = spdkClient.BdevLvolGetLvstore("", r.LvsUUID) + } else if r.LvsName != "" { + lvsList, err = spdkClient.BdevLvolGetLvstore(r.LvsName, "") + } + if err != nil { + return nil, err + } + if len(lvsList) != 1 { + return nil, fmt.Errorf("found zero or multiple lvstore with name %s and UUID %s during replica %s creation", r.LvsName, r.LvsUUID, r.Name) + } + if r.LvsName == "" { + r.LvsName = lvsList[0].Name + } + if r.LvsUUID == "" { + r.LvsUUID = lvsList[0].UUID + } + if r.LvsName != lvsList[0].Name || r.LvsUUID != lvsList[0].UUID { + return nil, fmt.Errorf("found mismatching between the actual lvstore name %s with UUID %s and the recorded lvstore name %s with UUID %s during replica %s creation", lvsList[0].Name, lvsList[0].UUID, r.LvsName, r.LvsUUID, r.Name) + } - headSvcLvol.UUID = bdevLvolList[0].UUID - headSvcLvol.CreationTime = bdevLvolList[0].CreationTime - headSvcLvol.ActualSize = bdevLvolList[0].DriverSpecific.Lvol.NumAllocatedClusters * defaultClusterSize - r.State = types.InstanceStateStopped + // A stopped replica may be a broken one. We need to make sure the head lvol is ready first. + if err := r.prepareHead(spdkClient); err != nil { + return nil, err } + r.State = types.InstanceStateStopped podIP, err := commonnet.GetIPForPod() if err != nil { @@ -712,9 +740,8 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio } nguid := commonutils.RandomID(nvmeNguidLength) - - if err := spdkClient.StartExposeBdev(nqn, headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil { - return nil, errors.Wrapf(err, "failed to expose replica %v", r.Name) + if err := spdkClient.StartExposeBdev(nqn, r.Head.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil { + return nil, err } r.IsExposed = true r.State = types.InstanceStateRunning @@ -874,10 +901,9 @@ func (r *Replica) SnapshotCreate(spdkClient *spdkclient.Client, snapshotName str } }() - if r.ChainLength < 2 { - return nil, fmt.Errorf("invalid chain length %d for replica snapshot creation", r.ChainLength) + if r.Head == nil { + return nil, fmt.Errorf("nil head for replica snapshot creation") } - headSvcLvol := r.ActiveChain[r.ChainLength-1] var xattrs []spdkclient.Xattr if opts != nil { @@ -894,7 +920,7 @@ func (r *Replica) SnapshotCreate(spdkClient *spdkclient.Client, snapshotName str xattrs = append(xattrs, snapshotTimestamp) } - snapUUID, err := spdkClient.BdevLvolSnapshot(headSvcLvol.UUID, snapLvolName, xattrs) + snapUUID, err := spdkClient.BdevLvolSnapshot(r.Head.UUID, snapLvolName, xattrs) if err != nil { return nil, err } @@ -904,23 +930,30 @@ func (r *Replica) SnapshotCreate(spdkClient *spdkclient.Client, snapshotName str return nil, err } if len(bdevLvolList) != 1 { - return nil, fmt.Errorf("zero or multiple snap lvols with UUID %s found after lvol snapshot", snapUUID) + return nil, fmt.Errorf("zero or multiple snap lvols with UUID %s found after lvol snapshot create", snapUUID) } - snapSvcLvol := BdevLvolInfoToServiceLvol(&bdevLvolList[0]) - snapSvcLvol.Children[headSvcLvol.Name] = headSvcLvol + + bdevLvolList, err = spdkClient.BdevLvolGet(r.Head.Alias, 0) + if err != nil { + return nil, err + } + if len(bdevLvolList) != 1 { + return nil, fmt.Errorf("zero or multiple head lvols with UUID %s found after lvol snapshot create", snapUUID) + } + r.Head = BdevLvolInfoToServiceLvol(&bdevLvolList[0]) + snapSvcLvol.Children[r.Head.Name] = r.Head // Already contain a valid snapshot lvol or backing image lvol before this snapshot creation - if r.ActiveChain[r.ChainLength-2] != nil { - prevSvcLvol := r.ActiveChain[r.ChainLength-2] - delete(prevSvcLvol.Children, headSvcLvol.Name) + if len(r.ActiveChain) > 1 && r.ActiveChain[len(r.ActiveChain)-2] != nil { + prevSvcLvol := r.ActiveChain[len(r.ActiveChain)-2] + delete(prevSvcLvol.Children, r.Head.Name) prevSvcLvol.Children[snapSvcLvol.Name] = snapSvcLvol } r.ActiveChain[r.ChainLength-1] = snapSvcLvol - r.ActiveChain = append(r.ActiveChain, headSvcLvol) + r.ActiveChain = append(r.ActiveChain, r.Head) r.ChainLength++ r.SnapshotLvolMap[snapLvolName] = snapSvcLvol - headSvcLvol.Parent = snapSvcLvol.Name updateRequired = true r.log.Infof("Replica created snapshot %s(%s)", snapshotName, snapSvcLvol.Alias) @@ -967,10 +1000,6 @@ func (r *Replica) SnapshotDelete(spdkClient *spdkclient.Client, snapshotName str } }() - if r.ChainLength < 2 { - return nil, fmt.Errorf("invalid chain length %d for replica snapshot delete", r.ChainLength) - } - if _, err := spdkClient.BdevLvolDelete(snapSvcLvol.UUID); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return nil, err } @@ -1082,6 +1111,7 @@ func (r *Replica) SnapshotRevert(spdkClient *spdkclient.Client, snapshotName str if r.ActiveChain[r.ChainLength-2] != nil { delete(r.ActiveChain[r.ChainLength-2].Children, r.Name) } + r.Head = nil r.ChainLength-- r.ActiveChain = r.ActiveChain[:r.ChainLength] @@ -1113,6 +1143,7 @@ func (r *Replica) SnapshotRevert(spdkClient *spdkclient.Client, snapshotName str return nil, err } + r.Head = newChain[len(newChain)-1] r.ActiveChain = newChain r.ChainLength = len(r.ActiveChain) r.SnapshotLvolMap = newSnapshotLvolMap @@ -1455,13 +1486,13 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa return "", err } if len(bdevLvolList) != 1 { - return "", fmt.Errorf("zero or multiple snap lvols with UUID %s found after rebuilding dst head %s creation", headLvolUUID, r.Name) + return "", fmt.Errorf("zero or multiple head lvols with UUID %s found after rebuilding dst head %s creation", headLvolUUID, r.Name) } - headSvcLvol := BdevLvolInfoToServiceLvol(&bdevLvolList[0]) - r.ActiveChain[1] = headSvcLvol + r.Head = BdevLvolInfoToServiceLvol(&bdevLvolList[0]) + r.ActiveChain[1] = r.Head nguid := commonutils.RandomID(nvmeNguidLength) - if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), headSvcLvol.UUID, nguid, r.IP, strconv.Itoa(int(r.PortStart))); err != nil { + if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), r.Head.UUID, nguid, r.IP, strconv.Itoa(int(r.PortStart))); err != nil { return "", err } r.IsExposed = true @@ -1483,7 +1514,7 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa r.isRebuilding = true - r.log.Infof("Replica created a new head %s(%s) based on the external snapshot %s(%s) from healthy replica %s for rebuilding start", headSvcLvol.Alias, dstHeadLvolAddress, externalSnapshotName, externalSnapshotAddress, srcReplicaName) + r.log.Infof("Replica created a new head %s(%s) based on the external snapshot %s(%s) from healthy replica %s for rebuilding start", r.Head.Alias, dstHeadLvolAddress, externalSnapshotName, externalSnapshotAddress, srcReplicaName) return dstHeadLvolAddress, nil } diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index 2e40fec8..87ed83cd 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -254,7 +254,7 @@ func (s *Server) verify() (err error) { replicaMapForSync[lvolName] = replicaMap[lvolName] } for replicaName, r := range replicaMap { - headSvcLvol := r.GetVolumeHead() + headSvcLvol := r.Head if headSvcLvol == nil { delete(replicaMap, replicaName) continue diff --git a/pkg/spdk/types.go b/pkg/spdk/types.go index d0bb3dfb..2cd20f06 100644 --- a/pkg/spdk/types.go +++ b/pkg/spdk/types.go @@ -47,6 +47,9 @@ type Lvol struct { } func ServiceLvolToProtoLvol(replicaName string, lvol *Lvol) *spdkrpc.Lvol { + if lvol == nil { + return nil + } res := &spdkrpc.Lvol{ Uuid: lvol.UUID, SpecSize: lvol.SpecSize, From 9a8ea35f4ecf883b815a2f1f2ea812180b318dd4 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Fri, 18 Oct 2024 12:30:48 +0800 Subject: [PATCH 4/7] refactor: remove r.ChainLength for simplicity Signed-off-by: Shuo Wu --- pkg/spdk/replica.go | 42 ++++++++++++++++-------------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 4eeee46a..d364f6b0 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -48,10 +48,6 @@ type Replica struct { // If a replica does not contain a backing image, the first entry will be nil. // The last entry of the chain should be the head lvol if it exists. ActiveChain []*Lvol - // ChainLength typically has length no less than 2. - // Since the first and last entries of ActiveChain are the backing image and the head, respectively. - // If the head does not exist, the last entry of ActiveChain will be a snapshot or the backing image. - ChainLength int // SnapshotLvolMap map[]. consists of `-snap-` SnapshotLvolMap map[string]*Lvol @@ -164,7 +160,6 @@ func NewReplica(ctx context.Context, replicaName, lvsName, lvsUUID string, specS ActiveChain: []*Lvol{ nil, }, - ChainLength: 2, SnapshotLvolMap: map[string]*Lvol{}, Name: replicaName, Alias: spdktypes.GetLvolAlias(lvsName, replicaName), @@ -258,7 +253,6 @@ func (r *Replica) construct(bdevLvolMap map[string]*spdktypes.BdevInfo) (err err r.Head = newChain[len(newChain)-1] r.ActiveChain = newChain - r.ChainLength = len(r.ActiveChain) r.SnapshotLvolMap = newSnapshotLvolMap if r.State == types.InstanceStatePending { @@ -343,7 +337,7 @@ func (r *Replica) validateAndUpdate(bdevLvolMap map[string]*spdktypes.BdevInfo, } } - replicaActualSize := newChain[r.ChainLength-1].ActualSize + replicaActualSize := newChain[len(newChain)-1].ActualSize for _, snapLvol := range newSnapshotLvolMap { replicaActualSize += snapLvol.ActualSize } @@ -682,8 +676,8 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio // Create bdev lvol if the replica is the new one if r.State == types.InstanceStatePending { - if r.ChainLength != 1 { - return nil, fmt.Errorf("invalid chain length %d for new replica creation", r.ChainLength) + if len(r.ActiveChain) != 1 { + return nil, fmt.Errorf("invalid chain length %d for new replica creation", len(r.ActiveChain)) } } @@ -950,9 +944,8 @@ func (r *Replica) SnapshotCreate(spdkClient *spdkclient.Client, snapshotName str delete(prevSvcLvol.Children, r.Head.Name) prevSvcLvol.Children[snapSvcLvol.Name] = snapSvcLvol } - r.ActiveChain[r.ChainLength-1] = snapSvcLvol + r.ActiveChain[len(r.ActiveChain)-1] = snapSvcLvol r.ActiveChain = append(r.ActiveChain, r.Head) - r.ChainLength++ r.SnapshotLvolMap[snapLvolName] = snapSvcLvol updateRequired = true @@ -1062,11 +1055,10 @@ func (r *Replica) removeLvolFromActiveChainWithoutLock(snapLvolName string) int // Cannot remove backing image lvol or head lvol prevChain := r.ActiveChain - if pos >= 1 && pos < r.ChainLength-1 { + if pos >= 1 && pos < len(r.ActiveChain)-1 { r.ActiveChain = append([]*Lvol{}, prevChain[:pos]...) r.ActiveChain = append(r.ActiveChain, prevChain[pos+1:]...) } - r.ChainLength = len(r.ActiveChain) return pos } @@ -1100,20 +1092,19 @@ func (r *Replica) SnapshotRevert(spdkClient *spdkclient.Client, snapshotName str } }() - if r.ChainLength < 2 { - return nil, fmt.Errorf("invalid chain length %d for replica snapshot revert", r.ChainLength) + if len(r.ActiveChain) < 2 { + return nil, fmt.Errorf("invalid chain length %d for replica snapshot revert", len(r.ActiveChain)) } if _, err := spdkClient.BdevLvolDelete(r.Alias); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { return nil, err } // The parent of the old head lvol is a valid snapshot lvol or backing image lvol - if r.ActiveChain[r.ChainLength-2] != nil { - delete(r.ActiveChain[r.ChainLength-2].Children, r.Name) + if r.ActiveChain[len(r.ActiveChain)-2] != nil { + delete(r.ActiveChain[len(r.ActiveChain)-2].Children, r.Name) } r.Head = nil - r.ChainLength-- - r.ActiveChain = r.ActiveChain[:r.ChainLength] + r.ActiveChain = r.ActiveChain[:len(r.ActiveChain)-1] // TODO: If the below steps fail, there will be no head lvol for the replica. Need to guarantee that the replica can be cleaned up correctly in this case @@ -1145,7 +1136,6 @@ func (r *Replica) SnapshotRevert(spdkClient *spdkclient.Client, snapshotName str r.Head = newChain[len(newChain)-1] r.ActiveChain = newChain - r.ChainLength = len(r.ActiveChain) r.SnapshotLvolMap = newSnapshotLvolMap if r.IsExposed { @@ -1188,8 +1178,8 @@ func (r *Replica) SnapshotPurge(spdkClient *spdkclient.Client) (err error) { } }() - if r.ChainLength < 2 { - return fmt.Errorf("invalid chain length %d for replica snapshot purge", r.ChainLength) + if len(r.ActiveChain) < 2 { + return fmt.Errorf("invalid chain length %d for replica snapshot purge", len(r.ActiveChain)) } // delete all non-user-created snapshots @@ -1435,8 +1425,8 @@ func (r *Replica) RebuildingDstStart(spdkClient *spdkclient.Client, srcReplicaNa updateRequired = true }() - if r.ChainLength != 2 { - return "", fmt.Errorf("invalid chain length %d for dst replica %v rebuilding start", r.ChainLength, r.Name) + if len(r.ActiveChain) != 2 { + return "", fmt.Errorf("invalid chain length %d for dst replica %v rebuilding start", len(r.ActiveChain), r.Name) } // Replica.Delete and Replica.Create do not guarantee that the previous rebuilding src replica info is cleaned up @@ -1562,8 +1552,8 @@ func (r *Replica) RebuildingDstFinish(spdkClient *spdkclient.Client) (err error) updateRequired = true }() - if r.ChainLength < 2 { - return fmt.Errorf("invalid chain length %d for dst replica %v rebuilding finish", r.ChainLength, r.Name) + if len(r.ActiveChain) < 2 { + return fmt.Errorf("invalid chain length %d for dst replica %v rebuilding finish", len(r.ActiveChain), r.Name) } // Switch from the external snapshot to use rebuilt snapshots From 21685f2ad331986b3fb45f9144bef7a91bfc43f5 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Mon, 21 Oct 2024 18:16:52 +0800 Subject: [PATCH 5/7] fix: server try the best to avoid eliminating broken replicas Signed-off-by: Shuo Wu --- pkg/spdk/server.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index 87ed83cd..a8e598df 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -3,6 +3,7 @@ package spdk import ( "fmt" "net" + "reflect" "strconv" "sync" "time" @@ -254,17 +255,24 @@ func (s *Server) verify() (err error) { replicaMapForSync[lvolName] = replicaMap[lvolName] } for replicaName, r := range replicaMap { - headSvcLvol := r.Head - if headSvcLvol == nil { - delete(replicaMap, replicaName) - continue - } - // TODO: How to handle a broken replica without a head lvol - if bdevLvolMap[headSvcLvol.Name] == nil { - delete(replicaMap, replicaName) - continue + // Try the best to avoid eliminating broken replicas without a head lvol + if bdevLvolMap[r.Name] == nil { + noReplicaLvol := true + for lvolName := range bdevLvolMap { + if IsReplicaLvol(r.Name, lvolName) { + noReplicaLvol = false + break + } + } + if noReplicaLvol { + delete(replicaMap, replicaName) + continue + } } } + if !reflect.DeepEqual(s.replicaMap, replicaMap) { + logrus.Infof("spdk gRPC server: Replica map updated, map count is changed from %d to %d", len(s.replicaMap), len(replicaMap)) + } s.replicaMap = replicaMap s.Unlock() From 9df3650f1f32625861f2f14538ecc13db4808551 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Tue, 22 Oct 2024 16:27:29 +0800 Subject: [PATCH 6/7] refactor: abstract replicaLvolFilter as a func Signed-off-by: Shuo Wu --- pkg/spdk/replica.go | 44 ++++++++++++-------------------------------- 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index d364f6b0..22bd2ee3 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -182,20 +182,21 @@ func NewReplica(ctx context.Context, replicaName, lvsName, lvsUUID string, specS } } +func (r *Replica) replicaLvolFilter(bdev *spdktypes.BdevInfo) bool { + if bdev == nil || len(bdev.Aliases) < 1 || bdev.DriverSpecific.Lvol == nil { + return false + } + lvolName := spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) + return IsReplicaLvol(r.Name, lvolName) || (len(r.ActiveChain) > 0 && r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) +} + func (r *Replica) Sync(spdkClient *spdkclient.Client) (err error) { r.Lock() defer r.Unlock() // It's better to let the server send the update signal // This lvol and nvmf subsystem fetch should be protected by replica lock, in case of snapshot operations happened during the sync-up. - replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { - var lvolName string - if len(bdev.Aliases) == 1 { - lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) - } - return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) - } - bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) + bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, r.replicaLvolFilter) if err != nil { return err } @@ -832,14 +833,7 @@ func (r *Replica) Delete(spdkClient *spdkclient.Client, cleanupRequired bool, su // Clean up the valid snapshot tree if len(r.ActiveChain) > 1 { - replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { - var lvolName string - if len(bdev.Aliases) == 1 { - lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) - } - return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) - } - bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) + bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, r.replicaLvolFilter) if err != nil { return err } @@ -1113,14 +1107,7 @@ func (r *Replica) SnapshotRevert(spdkClient *spdkclient.Client, snapshotName str return nil, err } - replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { - var lvolName string - if len(bdev.Aliases) == 1 { - lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) - } - return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) - } - bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) + bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, r.replicaLvolFilter) if err != nil { return nil, err } @@ -1576,14 +1563,7 @@ func (r *Replica) RebuildingDstFinish(spdkClient *spdkclient.Client) (err error) _ = r.doCleanupForRebuildingDst(spdkClient, r.rebuildingDstCache.rebuildingState == types.ProgressStateError) - replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { - var lvolName string - if len(bdev.Aliases) == 1 { - lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) - } - return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) - } - bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) + bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, r.replicaLvolFilter) if err != nil { return err } From 35fce04b522da4553fc0c4758308f5199ffaa7b8 Mon Sep 17 00:00:00 2001 From: Shuo Wu Date: Wed, 6 Nov 2024 21:55:44 -0800 Subject: [PATCH 7/7] fix: avoid eliminating rebuilding replicas without heads Signed-off-by: Shuo Wu --- pkg/spdk/replica.go | 6 ++++++ pkg/spdk/server.go | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 22bd2ee3..d257c00b 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -182,6 +182,12 @@ func NewReplica(ctx context.Context, replicaName, lvsName, lvsUUID string, specS } } +func (r *Replica) IsRebuilding() bool { + r.RLock() + defer r.RUnlock() + return r.State == types.InstanceStateRunning && r.isRebuilding +} + func (r *Replica) replicaLvolFilter(bdev *spdktypes.BdevInfo) bool { if bdev == nil || len(bdev.Aliases) < 1 || bdev.DriverSpecific.Lvol == nil { return false diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index a8e598df..24222192 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -255,8 +255,11 @@ func (s *Server) verify() (err error) { replicaMapForSync[lvolName] = replicaMap[lvolName] } for replicaName, r := range replicaMap { - // Try the best to avoid eliminating broken replicas without a head lvol + // Try the best to avoid eliminating broken replicas or rebuilding replicas without head lvols if bdevLvolMap[r.Name] == nil { + if r.IsRebuilding() { + continue + } noReplicaLvol := true for lvolName := range bdevLvolMap { if IsReplicaLvol(r.Name, lvolName) {