Skip to content

Commit

Permalink
feat(v2/auto-salvage): filter salvage candidate by volume head size
Browse files Browse the repository at this point in the history
longhorn/longhorn-8430

Signed-off-by: Chin-Ya Huang <[email protected]>
  • Loading branch information
c3y1huang committed Sep 23, 2024
1 parent 470a9c0 commit 0d205b4
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin
}

func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32,
initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) {
initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) {
if name == "" || volumeName == "" || len(replicaAddressMap) == 0 {
return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters")
}
Expand All @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui
UpgradeRequired: upgradeRequired,
TargetAddress: targetAddress,
InitiatorAddress: initiatorAddress,
SalvageRequested: salvageRequested,
})
if err != nil {
return nil, errors.Wrap(err, "failed to start SPDK engine")
Expand Down
77 changes: 76 additions & 1 deletion pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU
}
}

func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) {
func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) {
logrus.WithFields(logrus.Fields{
"portCount": portCount,
"upgradeRequired": upgradeRequired,
"replicaAddressMap": replicaAddressMap,
"initiatorAddress": initiatorAddress,
"targetAddress": targetAddress,
"salvageRequested": salvageRequested,
}).Info("Creating engine")

requireUpdate := true
Expand Down Expand Up @@ -206,6 +207,14 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
e.ReplicaAddressMap = replicaAddressMap
e.log = e.log.WithField("replicaAddressMap", replicaAddressMap)

if salvageRequested {
e.log.Info("Requesting salvage for engine replicas")

if err := e.updateInfoToFilterReplicaSalvageCandidates(); err != nil {
return nil, err
}
}

e.checkAndUpdateInfoFromReplicaNoLock()

e.log.Infof("Connected all available replicas %+v, then launching raid during engine creation", e.ReplicaModeMap)
Expand Down Expand Up @@ -271,6 +280,72 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
return e.getWithoutLock(), nil
}

// updateInfoToFilterReplicaSalvageCandidates updates replica information and filters
// out the replicas that are not eligible for salvage based on the volume head size.
//
// It iterates through all replicas and:
// - Skips the replicas with mode ERR.
// - Gets the volume head size of each replica.
// - Selects replicas with the largest volume head size as salvage candidates.
// - Marks the replicas that are not salvage candidates as mode ERR.
func (e *Engine) updateInfoToFilterReplicaSalvageCandidates() error {
volumeHeadSizeToReplicaNames := map[uint64][]string{}
for replicaName, replicaAddress := range e.ReplicaAddressMap {
if e.ReplicaModeMap[replicaName] == types.ModeERR {
e.log.Debugf("Skipping replica %s (mode %v) for salvage", replicaName, e.ReplicaModeMap[replicaName])
continue
}

func() {
replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress)
return
}

defer func() {
if errClose := replicaServiceCli.Close(); errClose != nil {
e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress)
}
}()

replica, err := replicaServiceCli.ReplicaGet(replicaName)
if err != nil {
e.log.WithError(err).Warnf("Marking replica %s from %v to ERR during salvage candidate filtering since failed to get replica info", replicaName, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
return
}

volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName)
}()
}

volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames)
if err != nil {
return errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size")
}

largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1]
e.log.Infof("Filtering salvage candidates by volume head size (%v) from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames)

salvageCandidates := map[string]bool{}
for _, replicaName := range volumeHeadSizeToReplicaNames[largestVolumeHeadSize] {
salvageCandidates[replicaName] = true
}

for replicaName := range e.ReplicaAddressMap {
if _, exist := salvageCandidates[replicaName]; !exist {
e.log.Infof("Marking replica %s from %v to ERR since it's not a salvage candidate", replicaName, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
continue
}

e.log.Infof("Including replica %s with mode %v as a salvage candidate", replicaName, e.ReplicaModeMap[replicaName])
}

return nil
}

func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) {
if !types.IsFrontendSupported(e.Frontend) {
return fmt.Errorf("unknown frontend type %s", e.Frontend)
Expand Down
2 changes: 1 addition & 1 deletion pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ
spdkClient := s.spdkClient
s.Unlock()

return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired)
return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested)
}

func localTargetExists(e *Engine) bool {
Expand Down

0 comments on commit 0d205b4

Please sign in to comment.