From a6554babe8f73b5683057d45a544d1904cbfefc3 Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Fri, 20 Sep 2024 14:34:05 +0800 Subject: [PATCH] feat(v2/auto-salvage): filter salvage candidate by volume head size longhorn/longhorn-8430 Signed-off-by: Chin-Ya Huang --- pkg/client/client.go | 3 +- pkg/spdk/engine.go | 77 +++++++++++++++++++++++++++++++++++++++++++- pkg/spdk/server.go | 2 +- pkg/spdk_test.go | 16 ++++----- 4 files changed, 87 insertions(+), 11 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 0c3f2bcc..b67f20ea 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, + SalvageRequested: salvageRequested, }) if err != nil { return nil, errors.Wrap(err, "failed to start SPDK engine") diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 8146a1a2..96c5f053 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -102,13 +102,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, + "salvageRequested": salvageRequested, }).Info("Creating engine") requireUpdate := true @@ -206,6 +207,14 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaAddressMap = replicaAddressMap e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + if salvageRequested { + e.log.Info("Requesting salvage for engine replicas") + + if err := e.updateInfoToFilterReplicaSalvageCandidates(); err != nil { + return nil, err + } + } + e.checkAndUpdateInfoFromReplicaNoLock() e.log.Infof("Connected all available replicas %+v, then launching raid during engine creation", e.ReplicaModeMap) @@ -271,6 +280,72 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str return e.getWithoutLock(), nil } +// updateInfoToFilterReplicaSalvageCandidates updates replica information and filters +// out the replicas that are not eligible for salvage based on the volume head size. +// +// It iterates through all replicas and: +// - Skips the replicas with mode ERR. +// - Gets the volume head size of each replica. +// - Selects replicas with the largest volume head size as salvage candidates. +// - Marks the replicas that are not salvage candidates as mode ERR. +func (e *Engine) updateInfoToFilterReplicaSalvageCandidates() error { + volumeHeadSizeToReplicaNames := map[uint64][]string{} + for replicaName, replicaAddress := range e.ReplicaAddressMap { + if e.ReplicaModeMap[replicaName] == types.ModeERR { + e.log.Debugf("Skipping replica %s (mode %v) for salvage", replicaName, e.ReplicaModeMap[replicaName]) + continue + } + + func() { + replicaServiceCli, err := GetServiceClient(replicaAddress) + if err != nil { + e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress) + return + } + + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress) + } + }() + + replica, err := replicaServiceCli.ReplicaGet(replicaName) + if err != nil { + e.log.WithError(err).Warnf("Marking replica %s from %v to ERR during salvage candidate filtering since failed to get replica info", replicaName, e.ReplicaModeMap[replicaName]) + e.ReplicaModeMap[replicaName] = types.ModeERR + return + } + + volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName) + }() + } + + volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames) + if err != nil { + return errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size") + } + + largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1] + e.log.Infof("Filtering salvage candidates by volume head size (%v) from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames) + + salvageCandidates := map[string]bool{} + for _, replicaName := range volumeHeadSizeToReplicaNames[largestVolumeHeadSize] { + salvageCandidates[replicaName] = true + } + + for replicaName := range e.ReplicaAddressMap { + if _, exist := salvageCandidates[replicaName]; !exist { + e.log.Infof("Marking replica %s from %v to ERR since it's not a salvage candidate", replicaName, e.ReplicaModeMap[replicaName]) + e.ReplicaModeMap[replicaName] = types.ModeERR + continue + } + + e.log.Infof("Including replica %s with mode %v as a salvage candidate", replicaName, e.ReplicaModeMap[replicaName]) + } + + return nil +} + func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) diff --git a/pkg/spdk/server.go b/pkg/spdk/server.go index 7d0b2658..2e40fec8 100644 --- a/pkg/spdk/server.go +++ b/pkg/spdk/server.go @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/pkg/spdk_test.go b/pkg/spdk_test.go index 8ccbafba..1123fc6f 100644 --- a/pkg/spdk_test.go +++ b/pkg/spdk_test.go @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) { replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))), replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) { replica2.Name: types.ModeRW, } endpoint := helperutil.GetLonghornDevicePath(volumeName) - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine without the frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName, // Restart the engine with the previous frontend err = spdkCli.EngineDelete(engineName) c.Assert(err, IsNil) - engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false) + engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false) c.Assert(err, IsNil) c.Assert(engine.State, Equals, types.InstanceStateRunning) c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap) @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Equals, "") @@ -1425,11 +1425,11 @@ func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) { replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))), } - engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false) + engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false) c.Assert(err, IsNil) targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort) - engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true) + engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false) c.Assert(err, IsNil) c.Assert(engine.Endpoint, Not(Equals), "") // Initiator is not created, so the IP and Port should be empty