Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(auto-salvage): v2 support #222

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin
}

func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32,
initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) {
initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) {
if name == "" || volumeName == "" || len(replicaAddressMap) == 0 {
return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters")
}
Expand All @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui
UpgradeRequired: upgradeRequired,
TargetAddress: targetAddress,
InitiatorAddress: initiatorAddress,
SalvageRequested: salvageRequested,
})
if err != nil {
return nil, errors.Wrap(err, "failed to start SPDK engine")
Expand Down
93 changes: 89 additions & 4 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU
}
}

func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) {
func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) {
logrus.WithFields(logrus.Fields{
"portCount": portCount,
"upgradeRequired": upgradeRequired,
"replicaAddressMap": replicaAddressMap,
"initiatorAddress": initiatorAddress,
"targetAddress": targetAddress,
"salvageRequested": salvageRequested,
}).Info("Creating engine")

requireUpdate := true
Expand Down Expand Up @@ -192,10 +193,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
initiatorCreationRequired = false
}

if salvageRequested {
e.log.Info("Requesting salvage for engine replicas")

replicaAddressMap, err = e.filterSalvageCandidates(replicaAddressMap)
if err != nil {
return nil, errors.Wrapf(err, "failed to update replica mode to filter salvage candidates")
}
}

for replicaName, replicaAddr := range replicaAddressMap {
e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{
Address: replicaAddr,
}

bdevName, err := connectNVMfBdev(spdkClient, replicaName, replicaAddr)
if err != nil {
e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr)
Expand Down Expand Up @@ -272,6 +283,80 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
return e.getWithoutLock(), nil
}

// filterSalvageCandidates updates the replicaAddressMap by retaining only replicas
// eligible for salvage based on the largest volume head size.
//
// It iterates through all replicas and:
// - Retrieves the volume head size for each replica.
// - Identifies replicas with the largest volume head size as salvage candidates.
// - Remove the replicas that are not eligible as salvage candidates.
func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (map[string]string, error) {
// Initialize filteredCandidates to hold a copy of replicaAddressMap.
filteredCandidates := map[string]string{}
for key, value := range replicaAddressMap {
filteredCandidates[key] = value
}

volumeHeadSizeToReplicaNames := map[uint64][]string{}

// Collect volume head size for each replica.
for replicaName, replicaAddress := range replicaAddressMap {
func() {
// Get service client for the current replica.
replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress)
return
}

defer func() {
if errClose := replicaServiceCli.Close(); errClose != nil {
e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress)
}
}()

// Retrieve replica information.
replica, err := replicaServiceCli.ReplicaGet(replicaName)
if err != nil {
e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica info", replicaName, replicaAddress)
delete(filteredCandidates, replicaName)
return
}

// Map volume head size to replica names.
volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName)
}()
}

// Sort the volume head sizes to find the largest.
volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames)
if err != nil {
return nil, errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size")
}

if len(volumeHeadSizeSorted) == 0 {
return nil, errors.New("failed to find any salvage candidate with volume head size")
}

// Determine salvage candidates with the largest volume head size.
largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1]
e.log.Infof("Selecting salvage candidates with the largest volume head size %v from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames)

// Filter out replicas that do not match the largest volume head size.
salvageCandidates := volumeHeadSizeToReplicaNames[largestVolumeHeadSize]
for replicaName := range replicaAddressMap {
if !commonutils.Contains(salvageCandidates, replicaName) {
e.log.Infof("Skipping salvage for replica %s with address %s due to not having the largest volume head size (%v)", replicaName, replicaAddressMap[replicaName])
delete(filteredCandidates, replicaName)
continue
}

e.log.Infof("Including replica %s as a salvage candidate", replicaName)
}

return filteredCandidates, nil
}

func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) {
if !types.IsFrontendSupported(e.Frontend) {
return fmt.Errorf("unknown frontend type %s", e.Frontend)
Expand Down Expand Up @@ -1812,12 +1897,12 @@ func (e *Engine) waitForRestoreComplete() error {
var err error
for range periodicChecker.C {
isReplicaRestoreCompleted := true
for replicaName, replicaAddress := range e.ReplicaAddressMap {
if e.ReplicaModeMap[replicaName] != types.ModeRW {
for replicaName, replicaStatus := range e.ReplicaStatusMap {
if replicaStatus.Mode != types.ModeRW {
continue
}

isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaAddress)
isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaStatus.Address)
if err != nil {
return errors.Wrapf(err, "failed to check replica %s restore status", replicaName)
}
Expand Down
58 changes: 50 additions & 8 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,11 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio
}
headSvcLvol := r.ActiveChain[r.ChainLength-1]

if headSvcLvol.UUID == "" && r.State == types.InstanceStateStopped {
r.log.Debugf("Updating replica %s state from %v to %v because headSvcLvol.UUID is empty", r.Name, r.State, types.InstanceStatePending)
r.State = types.InstanceStatePending
}
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved

// Create bdev lvol if the replica is the new one
if r.State == types.InstanceStatePending {
var lvsList []spdktypes.LvstoreInfo
Expand All @@ -636,17 +641,46 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio
return nil, fmt.Errorf("found mismatching between the actual lvstore name %s with UUID %s and the recorded lvstore name %s with UUID %s during replica %s creation", lvsList[0].Name, lvsList[0].UUID, r.LvsName, r.LvsUUID, r.Name)
}

r.log.Info("Creating a lvol bdev for the new replica")
if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil {
return nil, err
}
bdevLvolList, err := spdkClient.BdevLvolGet(r.Alias, 0)
if err != nil {
return nil, err
if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) {
return nil, errors.Wrapf(err, "failed to check existence of lvol bdev for the new replica %v", r.Name)
}

if len(bdevLvolList) == 0 {
r.log.Info("Creating a lvol bdev for the new replica")
if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil {
return nil, err
}
bdevLvolList, err = spdkClient.BdevLvolGet(r.Alias, 0)
if err != nil {
return nil, err
}
} else {
r.log.Infof("Skipping creating a lvol bdev %v during replica creation because it already exists", r.Alias)

replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool {
var lvolName string
if len(bdev.Aliases) == 1 {
lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0])
}
return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName)
}
bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter)
if err != nil {
return nil, err
}

r.log.Infof("Constructing replica %v object during replica creation", r.Name)
err = r.construct(bdevLvolMap)
if err != nil {
return nil, err
}
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved
}

if len(bdevLvolList) < 1 {
return nil, fmt.Errorf("cannot find lvol %v after creation", r.Alias)
}

headSvcLvol.UUID = bdevLvolList[0].UUID
headSvcLvol.CreationTime = bdevLvolList[0].CreationTime
headSvcLvol.ActualSize = bdevLvolList[0].DriverSpecific.Lvol.NumAllocatedClusters * defaultClusterSize
Expand All @@ -670,9 +704,17 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio
}
r.portAllocator = bitmap

nqn := helpertypes.GetNQN(r.Name)

// Blindly stop exposing the bdev if it exists. This is to avoid potential inconsistencies during salvage case.
if err := spdkClient.StopExposeBdev(nqn); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) {
return nil, errors.Wrapf(err, "failed to stop expose replica %v", r.Name)
}

nguid := commonutils.RandomID(nvmeNguidLength)
if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil {
return nil, err

if err := spdkClient.StartExposeBdev(nqn, headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil {
return nil, errors.Wrapf(err, "failed to expose replica %v", r.Name)
}
r.IsExposed = true
r.State = types.InstanceStateRunning
Expand Down
2 changes: 1 addition & 1 deletion pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ
spdkClient := s.spdkClient
s.Unlock()

return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired)
return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested)
}

func localTargetExists(e *Engine) bool {
Expand Down
16 changes: 8 additions & 8 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica2.Name: types.ModeRW,
}
endpoint := helperutil.GetLonghornDevicePath(volumeName)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -351,7 +351,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))),
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -527,7 +527,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
replica2.Name: types.ModeRW,
}
endpoint := helperutil.GetLonghornDevicePath(volumeName)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -1225,7 +1225,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
// Restart the engine without the frontend
err = spdkCli.EngineDelete(engineName)
c.Assert(err, IsNil)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendEmpty, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand All @@ -1240,7 +1240,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
// Restart the engine with the previous frontend
err = spdkCli.EngineDelete(engineName)
c.Assert(err, IsNil)
engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false)
engine, err = spdkCli.EngineCreate(engineName, volumeName, prevFrontend, defaultTestLvolSize, replicaAddressMap, 1, ip, ip, false, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.ReplicaAddressMap, DeepEquals, replicaAddressMap)
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) {
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}

engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false)
c.Assert(err, IsNil)

c.Assert(engine.Endpoint, Equals, "")
Expand Down Expand Up @@ -1425,11 +1425,11 @@ func (s *TestSuite) TestSPDKEngineCreateWithUpgradeRequired(c *C) {
replica2.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))),
}

engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false)
engine, err := spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, "127.0.0.1", ip, false, false)
c.Assert(err, IsNil)

targetAddress := fmt.Sprintf("%s:%d", engine.TargetIP, engine.TargetPort)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true)
engine, err = spdkCli.EngineCreate(engineName, volumeName, types.FrontendSPDKTCPBlockdev, defaultTestLvolSize, replicaAddressMap, 1, ip, targetAddress, true, false)
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
c.Assert(engine.Endpoint, Not(Equals), "")
// Initiator is not created, so the IP and Port should be empty
Expand Down
Loading