Skip to content

Commit

Permalink
feat(v2/auto-salvage): filter salvage candidate by volume head size
Browse files Browse the repository at this point in the history
longhorn/longhorn-8430

Signed-off-by: Chin-Ya Huang <[email protected]>
  • Loading branch information
c3y1huang committed Nov 6, 2024
1 parent 9ba378c commit 1842401
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 11 deletions.
3 changes: 2 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin
}

func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32,
initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) {
initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) {
if name == "" || volumeName == "" || len(replicaAddressMap) == 0 {
return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters")
}
Expand All @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui
UpgradeRequired: upgradeRequired,
TargetAddress: targetAddress,
InitiatorAddress: initiatorAddress,
SalvageRequested: salvageRequested,
})
if err != nil {
return nil, errors.Wrap(err, "failed to start SPDK engine")
Expand Down
87 changes: 86 additions & 1 deletion pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU
}
}

func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) {
func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) {
logrus.WithFields(logrus.Fields{
"portCount": portCount,
"upgradeRequired": upgradeRequired,
"replicaAddressMap": replicaAddressMap,
"initiatorAddress": initiatorAddress,
"targetAddress": targetAddress,
"salvageRequested": salvageRequested,
}).Info("Creating engine")

requireUpdate := true
Expand Down Expand Up @@ -192,10 +193,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
initiatorCreationRequired = false
}

if salvageRequested {
e.log.Info("Requesting salvage for engine replicas")

replicaAddressMap, err = e.filterSalvageCandidates(replicaAddressMap)
if err != nil {
return nil, errors.Wrapf(err, "failed to update replica mode to filter salvage candidates")
}
}

for replicaName, replicaAddr := range replicaAddressMap {
e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{
Address: replicaAddr,
}

bdevName, err := connectNVMfBdev(spdkClient, replicaName, replicaAddr)
if err != nil {
e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr)
Expand Down Expand Up @@ -272,6 +283,80 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
return e.getWithoutLock(), nil
}

// filterSalvageCandidates updates the replicaAddressMap by retaining only replicas
// eligible for salvage based on the largest volume head size.
//
// It iterates through all replicas and:
// - Retrieves the volume head size for each replica.
// - Identifies replicas with the largest volume head size as salvage candidates.
// - Remove the replicas that are not eligible as salvage candidates.
func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (map[string]string, error) {
// Intialize filteredCandidates to hold a copy of replicaAddressMap.
filteredCandidates := map[string]string{}
for key, value := range replicaAddressMap {
filteredCandidates[key] = value
}

volumeHeadSizeToReplicaNames := map[uint64][]string{}

// Collect volume head size for each replica.
for replicaName, replicaAddress := range replicaAddressMap {
func() {
// Get service client for the current replica.
replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress)
return
}

defer func() {
if errClose := replicaServiceCli.Close(); errClose != nil {
e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress)
}
}()

// Retrieve replica information.
replica, err := replicaServiceCli.ReplicaGet(replicaName)
if err != nil {
e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica info", replicaName, replicaAddress)
delete(filteredCandidates, replicaName)
return
}

// Map volume head size to replica names.
volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName)
}()
}

// Sort the volume head sizes to find the largest.
volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames)
if err != nil {
return nil, errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size")
}

if len(volumeHeadSizeSorted) == 0 {
return nil, errors.New("failed to find any salvage candidate with volume head size")
}

// Determine salvage candidates with the largest volume head size.
largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1]
e.log.Infof("Selecting salvage candidates with the largest volume head size %v from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames)

// Filter out replicas that do not match the largest volume head size.
salvageCandidates := volumeHeadSizeToReplicaNames[largestVolumeHeadSize]
for replicaName := range replicaAddressMap {
if !commonutils.Contains(salvageCandidates, replicaName) {
e.log.Infof("Skipping salvage for replica %s with address %s due to not having the largest volume head size (%v)", replicaName, replicaAddressMap[replicaName])
delete(filteredCandidates, replicaName)
continue
}

e.log.Infof("Including replica %s as a salvage candidate", replicaName)
}

return filteredCandidates, nil
}

func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) {
if !types.IsFrontendSupported(e.Frontend) {
return fmt.Errorf("unknown frontend type %s", e.Frontend)
Expand Down
2 changes: 1 addition & 1 deletion pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ
spdkClient := s.spdkClient
s.Unlock()

return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired)
return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested)
}

func localTargetExists(e *Engine) bool {
Expand Down
16 changes: 8 additions & 8 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica2.Name: types.ModeRW,
}
endpoint := helperutil.GetLonghornDevicePath(volumeName)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))),
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
replica2.Name: types.ModeRW,
}
endpoint := helperutil.GetLonghornDevicePath(volumeName)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
// Restart the engine without the frontend
err = spdkCli.EngineDelete(engineName)
c.Assert(err, IsNil)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand All @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
// Restart the engine with the previous frontend
err = spdkCli.EngineDelete(engineName)
c.Assert(err, IsNil)
engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) {
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}

engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false)
c.Assert(err, IsNil)

c.Assert(engine.Endpoint, Equals, "")
Expand Down Expand Up @@ -1425,11 +1425,11 @@ func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) {
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}

engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false)
c.Assert(err, IsNil)

targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false)
c.Assert(err, IsNil)
c.Assert(engine.Endpoint, Not(Equals), "")
// Initiator is not created, so the IP and Port should be empty
Expand Down

0 comments on commit 1842401

Please sign in to comment.