Skip to content

Commit

Permalink
feat(v2/auto-salvage): support ReplicaGet using runtime replica info
Browse files Browse the repository at this point in the history
longhorn/longhorn-8430

Signed-off-by: Chin-Ya Huang <[email protected]>
  • Loading branch information
c3y1huang committed Sep 27, 2024
1 parent f495ca1 commit 8c59842
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 13 deletions.
5 changes: 3 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *SPDKClient) ReplicaDelete(name string, cleanupRequired bool) error {
return errors.Wrapf(err, "failed to delete SPDK replica %v", name)
}

func (c *SPDKClient) ReplicaGet(name string) (*api.Replica, error) {
func (c *SPDKClient) ReplicaGet(name string, runtimeRequested bool) (*api.Replica, error) {
if name == "" {
return nil, fmt.Errorf("failed to get SPDK replica: missing required parameter")
}
Expand All @@ -102,7 +102,8 @@ func (c *SPDKClient) ReplicaGet(name string) (*api.Replica, error) {
defer cancel()

resp, err := client.ReplicaGet(ctx, &spdkrpc.ReplicaGetRequest{
Name: name,
Name: name,
RuntimeRequested: runtimeRequested,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to get SPDK replica %v", name)
Expand Down
10 changes: 5 additions & 5 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (e *Engine) updateInfoToFilterReplicaSalvageCandidates() error {
}
}()

replica, err := replicaServiceCli.ReplicaGet(replicaName)
replica, err := replicaServiceCli.ReplicaGet(replicaName, true)
if err != nil {
e.log.WithError(err).Warnf("Marking replica %s from %v to ERR during salvage candidate filtering since failed to get replica info", replicaName, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
Expand Down Expand Up @@ -750,7 +750,7 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() {
}
}()

replica, err := replicaServiceCli.ReplicaGet(replicaName)
replica, err := replicaServiceCli.ReplicaGet(replicaName, true)
if err != nil {
e.log.WithError(err).Warnf("Failed to get replica %s with address %s, mark the mode from %v to ERR", replicaName, address, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
Expand Down Expand Up @@ -1321,7 +1321,7 @@ func (e *Engine) getReplicaAddSrcReplica() (srcReplicaName, srcReplicaAddress st
}

func getRebuildingSnapshotList(srcReplicaServiceCli *client.SPDKClient, srcReplicaName string) ([]*api.Lvol, error) {
rpcSrcReplica, err := srcReplicaServiceCli.ReplicaGet(srcReplicaName)
rpcSrcReplica, err := srcReplicaServiceCli.ReplicaGet(srcReplicaName, false)
if err != nil {
return []*api.Lvol{}, err
}
Expand Down Expand Up @@ -1577,7 +1577,7 @@ func (e *Engine) snapshotOperationPreCheckWithoutLock(replicaClients map[string]
if e.ReplicaModeMap[replicaName] == types.ModeWO {
return "", fmt.Errorf("engine %s contains WO replica %s during snapshot %s revert", e.Name, replicaName, snapshotName)
}
r, err := replicaClients[replicaName].ReplicaGet(replicaName)
r, err := replicaClients[replicaName].ReplicaGet(replicaName, false)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -1685,7 +1685,7 @@ func (e *Engine) ReplicaList(spdkClient *spdkclient.Client) (ret map[string]*api
}
}()

replica, err := replicaServiceCli.ReplicaGet(name)
replica, err := replicaServiceCli.ReplicaGet(name, false)
if err != nil {
e.log.WithError(err).Errorf("Failed to get replica %s with address %s", name, address)
return
Expand Down
45 changes: 45 additions & 0 deletions pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,49 @@ func (s *Server) ReplicaGet(ctx context.Context, req *spdkrpc.ReplicaGetRequest)
r := s.replicaMap[req.Name]
s.RUnlock()

rRuntime := &Replica{}
spdkClient := s.spdkClient
if req.RuntimeRequested {
bdevLvolMap, err := GetBdevLvolMap(spdkClient)
if err != nil {
return nil, err
}

for lvolName, bdevLvol := range bdevLvolMap {
if lvolName != req.Name {
continue
}

lvsUUID := bdevLvol.DriverSpecific.Lvol.LvolStoreUUID

lvsList, err := spdkClient.BdevLvolGetLvstore("", lvsUUID)
if err != nil {
return nil, err
}

if len(lvsList) == 0 {
return nil, fmt.Errorf("failed to find lvs with UUID %v", lvsUUID)
}

if len(lvsList) > 1 {
return nil, fmt.Errorf("found more than one lvs with UUID %v", lvsUUID)
}

specSize := bdevLvol.NumBlocks * uint64(bdevLvol.BlockSize)
actualSize := bdevLvol.DriverSpecific.Lvol.NumAllocatedClusters * uint64(defaultClusterSize)
rRuntime = NewReplica(s.ctx, lvolName, lvsList[0].Name, lvsUUID, specSize, actualSize, s.updateChs[types.InstanceTypeReplica])

err = rRuntime.Sync(spdkClient)
if err != nil && jsonrpc.IsJSONRPCRespErrorBrokenPipe(err) {
return nil, errors.Wrapf(err, "failed to sync replica %v", req.Name)
}

break
}

r = rRuntime
}

if r == nil {
return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find replica %v", req.Name)
}
Expand Down Expand Up @@ -1000,6 +1043,8 @@ func (s *Server) EngineGet(ctx context.Context, req *spdkrpc.EngineGetRequest) (
return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %v", req.Name)
}

e.checkAndUpdateInfoFromReplicaNoLock()

return e.Get(), nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,15 +324,15 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
err = spdkCli.ReplicaDelete(replicaName2, false)
c.Assert(err, IsNil)

replica1, err = spdkCli.ReplicaGet(replicaName1)
replica1, err = spdkCli.ReplicaGet(replicaName1, false)
c.Assert(err, IsNil)
c.Assert(replica1.LvsName, Equals, defaultTestDiskName)
c.Assert(replica1.LvsUUID, Equals, disk.Uuid)
c.Assert(replica1.State, Equals, types.InstanceStateStopped)
c.Assert(replica1.PortStart, Equals, int32(0))
c.Assert(replica1.PortEnd, Equals, int32(0))

replica2, err = spdkCli.ReplicaGet(replicaName1)
replica2, err = spdkCli.ReplicaGet(replicaName1, false)
c.Assert(err, IsNil)
c.Assert(replica2.LvsName, Equals, defaultTestDiskName)
c.Assert(replica2.LvsUUID, Equals, disk.Uuid)
Expand Down Expand Up @@ -886,7 +886,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
replicaAddressMap[replica3.Name] = net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart)))
snapshotNameRebuild1 := ""
for replicaName := range replicaAddressMap {
replica, err := spdkCli.ReplicaGet(replicaName)
replica, err := spdkCli.ReplicaGet(replicaName, false)
c.Assert(err, IsNil)
for snapName, snapLvol := range replica.Snapshots {
if strings.HasPrefix(snapName, server.RebuildingSnapshotNamePrefix) {
Expand Down Expand Up @@ -984,7 +984,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
replicaAddressMap[replica4.Name] = net.JoinHostPort(ip, strconv.Itoa(int(replica4.PortStart)))
snapshotNameRebuild2 := ""
for replicaName := range replicaAddressMap {
replica, err := spdkCli.ReplicaGet(replicaName)
replica, err := spdkCli.ReplicaGet(replicaName, false)
c.Assert(err, IsNil)
for snapName, snapLvol := range replica.Snapshots {
if snapName != snapshotNameRebuild1 && strings.HasPrefix(snapName, server.RebuildingSnapshotNamePrefix) {
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
delete(snapshotOpts, snapshotNameRebuild1)
checkReplicaSnapshots(c, spdkCli, engineName, []string{replicaName3, replicaName4}, snapshotMap, snapshotOpts)
for _, replicaName := range []string{replicaName3, replicaName4} {
replica, err := spdkCli.ReplicaGet(replicaName)
replica, err := spdkCli.ReplicaGet(replicaName, false)
c.Assert(err, IsNil)
c.Assert(replica.Head.Parent, Equals, snapshotNameRebuild2)
c.Assert(replica.Head.ActualSize, Equals, uint64(0))
Expand Down Expand Up @@ -1178,7 +1178,7 @@ func checkReplicaSnapshots(c *C, spdkCli *client.SPDKClient, engineName string,
c.Assert(err, IsNil)

for _, replicaName := range replicaList {
replica, err := spdkCli.ReplicaGet(replicaName)
replica, err := spdkCli.ReplicaGet(replicaName, false)
c.Assert(err, IsNil)
c.Assert(len(replica.Snapshots), Equals, len(snapshotMap))

Expand Down

0 comments on commit 8c59842

Please sign in to comment.