Skip to content

Commit

Permalink
feat(v2/auto-salvage): support EngineGet to use runtime replica info
Browse files Browse the repository at this point in the history
longhorn/longhorn-8430

Signed-off-by: Chin-Ya Huang <[email protected]>
  • Loading branch information
c3y1huang committed Sep 27, 2024
1 parent 8c59842 commit 743f3f2
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 22 deletions.
5 changes: 3 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *SPDKClient) EngineDelete(name string) error {
return errors.Wrapf(err, "failed to delete SPDK engine %v", name)
}

func (c *SPDKClient) EngineGet(name string) (*api.Engine, error) {
func (c *SPDKClient) EngineGet(name string, runtimeRequested bool) (*api.Engine, error) {
if name == "" {
return nil, fmt.Errorf("failed to get SPDK engine: missing required parameter")
}
Expand All @@ -525,7 +525,8 @@ func (c *SPDKClient) EngineGet(name string) (*api.Engine, error) {
defer cancel()

resp, err := client.EngineGet(ctx, &spdkrpc.EngineGetRequest{
Name: name,
Name: name,
RuntimeRequested: runtimeRequested,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to get SPDK engine %v", name)
Expand Down
18 changes: 9 additions & 9 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str
}
}

e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(false)

e.log.Infof("Connected all available replicas %+v, then launching raid during engine creation", e.ReplicaModeMap)
if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil {
Expand All @@ -241,7 +241,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str

var engineWithTarget *api.Engine
if initiatorIP != targetIP {
engineWithTarget, err = targetSPDKClient.EngineGet(e.Name)
engineWithTarget, err = targetSPDKClient.EngineGet(e.Name, false)
if err != nil {
return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress)
}
Expand Down Expand Up @@ -716,12 +716,12 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) {
// TODO: should we delete the engine automatically here?
}

e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(false)

return nil
}

func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() {
func (e *Engine) checkAndUpdateInfoFromReplicaNoLock(runtimeRequested bool) {
replicaMap := map[string]*api.Replica{}
replicaAncestorMap := map[string]*api.Lvol{}
// hasBackingImage := false
Expand Down Expand Up @@ -750,7 +750,7 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() {
}
}()

replica, err := replicaServiceCli.ReplicaGet(replicaName, true)
replica, err := replicaServiceCli.ReplicaGet(replicaName, runtimeRequested)
if err != nil {
e.log.WithError(err).Warnf("Failed to get replica %s with address %s, mark the mode from %v to ERR", replicaName, address, e.ReplicaModeMap[replicaName])
e.ReplicaModeMap[replicaName] = types.ModeERR
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe
if err != nil {
return err
}
e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(false)

rebuildingSnapshotList, err = getRebuildingSnapshotList(srcReplicaServiceCli, srcReplicaName)
if err != nil {
Expand Down Expand Up @@ -1292,7 +1292,7 @@ func (e *Engine) replicaAddFinish(srcReplicaServiceCli, dstReplicaServiceCli *cl
}
updateRequired = true
}
e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(false)

// The source replica blindly stops exposing the snapshot and wipe the rebuilding info.
if srcReplicaErr := srcReplicaServiceCli.ReplicaRebuildingSrcFinish(srcReplicaName, dstReplicaName); srcReplicaErr != nil {
Expand Down Expand Up @@ -1510,7 +1510,7 @@ func (e *Engine) snapshotOperation(spdkClient *spdkclient.Client, inputSnapshotN
return "", err
}

e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(false)

e.log.Infof("Engine finished snapshot operation %s %s", snapshotOp, snapshotName)

Expand Down Expand Up @@ -1557,7 +1557,7 @@ func (e *Engine) snapshotOperationPreCheckWithoutLock(replicaClients map[string]
if e.ReplicaModeMap[replicaName] == types.ModeWO {
return "", fmt.Errorf("engine %s contains WO replica %s during snapshot %s delete", e.Name, replicaName, snapshotName)
}
e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(false)
if len(e.SnapshotMap[snapshotName].Children) > 1 {
return "", fmt.Errorf("engine %s cannot delete snapshot %s since it contains multiple children %+v", e.Name, snapshotName, e.SnapshotMap[snapshotName].Children)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ func (s *Server) EngineGet(ctx context.Context, req *spdkrpc.EngineGetRequest) (
return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %v", req.Name)
}

e.checkAndUpdateInfoFromReplicaNoLock()
e.checkAndUpdateInfoFromReplicaNoLock(req.RuntimeRequested)

return e.Get(), nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/spdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
c.Assert(err, IsNil)
err = spdkCli.EngineReplicaDelete(engineName, replicaName2, net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))))
c.Assert(err, IsNil)
engine, err = spdkCli.EngineGet(engineName)
engine, err = spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev)
Expand Down Expand Up @@ -410,7 +410,7 @@ func (s *TestSuite) TestSPDKMultipleThread(c *C) {
replica1.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))),
replica3.Name: net.JoinHostPort(ip, strconv.Itoa(int(replica3.PortStart))),
}
engine, err = spdkCli.EngineGet(engineName)
engine, err = spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev)
Expand Down Expand Up @@ -859,7 +859,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
c.Assert(err, IsNil)
err = spdkCli.EngineReplicaDelete(engineName, replicaName1, net.JoinHostPort(ip, strconv.Itoa(int(replica1.PortStart))))
c.Assert(err, IsNil)
engine, err = spdkCli.EngineGet(engineName)
engine, err = spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev)
Expand Down Expand Up @@ -904,7 +904,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
c.Assert(snapshotNameRebuild1, Not(Equals), "")

// Verify the 1st rebuilding result
engine, err = spdkCli.EngineGet(engineName)
engine, err = spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev)
Expand Down Expand Up @@ -953,7 +953,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
c.Assert(err, IsNil)
err = spdkCli.EngineReplicaDelete(engineName, replicaName2, net.JoinHostPort(ip, strconv.Itoa(int(replica2.PortStart))))
c.Assert(err, IsNil)
engine, err = spdkCli.EngineGet(engineName)
engine, err = spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev)
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
c.Assert(snapshotNameRebuild2, Not(Equals), "")

// Verify the 2nd rebuilding result
engine, err = spdkCli.EngineGet(engineName)
engine, err = spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
c.Assert(engine.Frontend, Equals, types.FrontendSPDKTCPBlockdev)
Expand Down Expand Up @@ -1174,7 +1174,7 @@ func (s *TestSuite) TestSPDKMultipleThreadSnapshotOpsAndRebuilding(c *C) {
}

func checkReplicaSnapshots(c *C, spdkCli *client.SPDKClient, engineName string, replicaList []string, snapshotMap map[string][]string, snapshotOpts map[string]api.SnapshotOptions) {
engine, err := spdkCli.EngineGet(engineName)
engine, err := spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)

for _, replicaName := range replicaList {
Expand Down Expand Up @@ -1212,7 +1212,7 @@ func revertSnapshot(c *C, spdkCli *client.SPDKClient, snapshotName, volumeName,
c.Assert(err, IsNil)
os.Setenv(commonnet.EnvPodIP, ip)

engine, err := spdkCli.EngineGet(engineName)
engine, err := spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)

if engine.State != types.InstanceStateRunning {
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func WaitForReplicaRebuildingComplete(c *C, spdkCli *client.SPDKClient, engineNa
}

if rebuildingStatus.TotalState == types.ProgressStateComplete {
engine, err := spdkCli.EngineGet(engineName)
engine, err := spdkCli.EngineGet(engineName, false)
c.Assert(err, IsNil)
c.Assert(engine.State, Equals, types.InstanceStateRunning)
if engine.ReplicaModeMap[replicaName] == types.ModeRW {
Expand Down Expand Up @@ -1367,7 +1367,7 @@ func (s *TestSuite) TestSPDKEngineOnlyWithTarget(c *C) {
err = spdkCli.EngineDeleteTarget(engineName)
c.Assert(err, IsNil)

_, err = spdkCli.EngineGet(engineName)
_, err = spdkCli.EngineGet(engineName, false)
c.Assert(strings.Contains(err.Error(), "cannot find engine"), Equals, true)

err = spdkCli.ReplicaDelete(replicaName1, false)
Expand Down

0 comments on commit 743f3f2

Please sign in to comment.