Skip to content

Commit

Permalink
feat(v2/auto-salvage): filter salvage candidate by volume head size
Browse files Browse the repository at this point in the history
longhorn/longhorn-8430

Signed-off-by: Chin-Ya Huang <[email protected]>
  • Loading branch information
c3y1huang committed Sep 27, 2024
1 parent 2e18100 commit f495ca1
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 11 deletions.
3 changes: 2 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin
}

func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32,
initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) {
initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) {
if name == "" || volumeName == "" || len(replicaAddressMap) == 0 {
return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters")
}
Expand All @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui
UpgradeRequired: upgradeRequired,
TargetAddress: targetAddress,
InitiatorAddress: initiatorAddress,
SalvageRequested: salvageRequested,
})
if err != nil {
return nil, errors.Wrap(err, "failed to start SPDK engine")
Expand Down
77 changes: 76 additions & 1 deletion pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU
}
}

func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) {
func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) {
logrus.WithFields(logrus.Fields{
"portCount": portCount,
"upgradeRequired": upgradeRequired,
"replicaAddressMap": replicaAddressMap,
"initiatorAddress": initiatorAddress,
"targetAddress": targetAddress,
"salvageRequested": salvageRequested,
}).Info("Creating engine")

requireUpdate := true
Expand Down Expand Up @@ -206,6 +207,14 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
e.ReplicaAddressMap = replicaAddressMap
e.log = e.log.WithField("replicaAddressMap", replicaAddressMap)

if salvageRequested {
e.log.Info("Requesting salvage for engine replicas")

if err := e.updateInfoToFilterReplicaSalvageCandidates(); err != nil {
return nil, err
}
}

e.checkAndUpdateInfoFromReplicaNoLock()

e.log.Infof("Connected all available replicas %+v, then launching raid during engine creation", e.ReplicaModeMap)
Expand Down Expand Up @@ -271,6 +280,72 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
return e.getWithoutLock(), nil
}

// updateInfoToFilterReplicaSalvageCandidates updates replica information and filters
// out the replicas that are not eligible for salvage based on the volume head size.
//
// It iterates through all replicas and:
// - Skips the replicas with mode ERR.
// - Gets the volume head size of each replica.
// - Selects replicas with the largest volume head size as salvage candidates.
// - Marks the replicas that are not salvage candidates as mode ERR.
func (e *Engine) updateInfoToFilterReplicaSalvageCandidates() error {
volumeHeadSizeToReplicaNames := map[uint64][]string{}
for replicaName, replicaAddress := range e.ReplicaAddressMap {
if e.ReplicaModeMap[replicaName] == types.ModeERR {
e.log.Debugf("Skipping replica %s (mode %v) for salvage", replicaName, e.ReplicaModeMap[replicaName])
continue
}

func() {
replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress)
return
}

defer func() {
if errClose := replicaServiceCli.Close(); errClose != nil {
e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress)
}
}()

replica, err := replicaServiceCli.ReplicaGet(replicaName)
if err != nil {
e.log.WithError(err).Warnf("Marking replica %s from %v to ERR during salvage candidate filtering since failed to get replica info", replicaName, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
return
}

volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName)
}()
}

volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames)
if err != nil {
return errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size")
}

largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1]
e.log.Infof("Filtering salvage candidates by volume head size (%v) from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames)

salvageCandidates := map[string]bool{}
for _, replicaName := range volumeHeadSizeToReplicaNames[largestVolumeHeadSize] {
salvageCandidates[replicaName] = true
}

for replicaName := range e.ReplicaAddressMap {
if _, exist := salvageCandidates[replicaName]; !exist {
e.log.Infof("Marking replica %s from %v to ERR since it's not a salvage candidate", replicaName, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
continue
}

e.log.Infof("Including replica %s with mode %v as a salvage candidate", replicaName, e.ReplicaModeMap[replicaName])
}

return nil
}

func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) {
if !types.IsFrontendSupported(e.Frontend) {
return fmt.Errorf("unknown frontend type %s", e.Frontend)
Expand Down
2 changes: 1 addition & 1 deletion pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ
spdkClient := s.spdkClient
s.Unlock()

return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired)
return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested)
}

func localTargetExists(e *Engine) bool {
Expand Down
16 changes: 8 additions & 8 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica2.Name: types.ModeRW,
}
endpoint := helperutil.GetLonghornDevicePath(volumeName)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))),
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
replica2.Name: types.ModeRW,
}
endpoint := helperutil.GetLonghornDevicePath(volumeName)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
// Restart the engine without the frontend
err = spdkCli.EngineDelete(engineName)
c.Assert(err, IsNil)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand All @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
// Restart the engine with the previous frontend
err = spdkCli.EngineDelete(engineName)
c.Assert(err, IsNil)
engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) {
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}

engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false)
c.Assert(err, IsNil)

c.Assert(engine.Endpoint, Equals, "")
Expand Down Expand Up @@ -1425,11 +1425,11 @@ func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) {
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}

engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false)
c.Assert(err, IsNil)

targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false)
c.Assert(err, IsNil)
c.Assert(engine.Endpoint, Not(Equals), "")
// Initiator is not created, so the IP and Port should be empty
Expand Down

0 comments on commit f495ca1

Please sign in to comment.