Skip to content

Commit

Permalink
feat(v2 upgrade): support engine live upgrade
Browse files Browse the repository at this point in the history
Longhorn 9104

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Dec 10, 2024
1 parent 4eec71b commit 8b80320
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 29 deletions.
5 changes: 4 additions & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package api

import (
"github.com/longhorn/types/pkg/generated/spdkrpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/longhorn/types/pkg/generated/spdkrpc"

"github.com/longhorn/longhorn-spdk-engine/pkg/types"
)

Expand Down Expand Up @@ -129,6 +130,7 @@ type Engine struct {
Port int32 `json:"port"`
TargetIP string `json:"target_ip"`
TargetPort int32 `json:"target_port"`
StandbyTargetPort int32 `json:"standby_target_port"`
ReplicaAddressMap map[string]string `json:"replica_address_map"`
ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"`
Head *Lvol `json:"head"`
Expand All @@ -149,6 +151,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine {
Port: e.Port,
TargetIP: e.TargetIp,
TargetPort: e.TargetPort,
StandbyTargetPort: e.StandbyTargetPort,
ReplicaAddressMap: e.ReplicaAddressMap,
ReplicaModeMap: map[string]types.Mode{},
Head: ProtoLvolToLvol(e.Head),
Expand Down
3 changes: 2 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/longhorn/types/pkg/generated/spdkrpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/longhorn/types/pkg/generated/spdkrpc"

"github.com/longhorn/longhorn-spdk-engine/pkg/api"
"github.com/longhorn/longhorn-spdk-engine/pkg/util"
)
Expand Down
86 changes: 59 additions & 27 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ import (
type Engine struct {
sync.RWMutex

Name string
VolumeName string
SpecSize uint64
ActualSize uint64
IP string
Port int32 // Port that initiator is connecting to
TargetIP string
TargetPort int32 // Port of the target that is used for letting initiator connect to
Frontend string
Endpoint string
Nqn string
Nguid string
Name string
VolumeName string
SpecSize uint64
ActualSize uint64
IP string
Port int32 // Port that initiator is connecting to
TargetIP string
TargetPort int32 // Port of the target that is used for letting initiator connect to
StandbyTargetPort int32
Frontend string
Endpoint string
Nqn string
Nguid string

ctrlrLossTimeout int
fastIOFailTimeoutSec int
Expand Down Expand Up @@ -115,7 +116,7 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU
}

func (e *Engine) isNewEngine() bool {
return e.IP == "" && e.TargetIP == ""
return e.IP == "" && e.TargetIP == "" && e.StandbyTargetPort == 0
}

func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) {
Expand All @@ -132,7 +133,11 @@ func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP,
} else if e.Port != 0 && e.TargetPort == 0 {
// Only target instance creation is required, because the initiator instance is already running
e.log.Info("Creating a target instance")
targetCreationRequired = true
if e.StandbyTargetPort != 0 {
e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort)
} else {
targetCreationRequired = true
}

Check warning on line 140 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L136-L140

Added lines #L136 - L140 were not covered by tests
} else {
e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort)
}
Expand Down Expand Up @@ -404,6 +409,13 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m
return filteredCandidates, nil
}

func (e *Engine) isStandbyTargetCreationRequired() bool {
// e.Port is non-zero which means the initiator instance is already created and connected to a target instance.
// e.TargetPort is zero which means the target instance is not created on the same pod.
// Thus, a standby target instance should be created for the target instance switch-over.
return e.Port != 0 && e.TargetPort == 0

Check warning on line 416 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L413-L416

Added lines #L413 - L416 were not covered by tests
}

Check warning on line 418 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L418

Added line #L418 was not covered by tests
func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string,
initiatorCreationRequired, targetCreationRequired bool) (err error) {
if !types.IsFrontendSupported(e.Frontend) {
Expand All @@ -415,6 +427,8 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc
return nil
}

standbyTargetCreationRequired := e.isStandbyTargetCreationRequired()

Check warning on line 431 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L430-L431

Added lines #L430 - L431 were not covered by tests
targetIP, targetPort, err := splitHostPort(targetAddress)
if err != nil {
return err
Expand All @@ -432,14 +446,16 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc

defer func() {
if err == nil {
e.initiator = initiator
e.dmDeviceIsBusy = dmDeviceIsBusy
e.Endpoint = initiator.GetEndpoint()
e.log = e.log.WithFields(logrus.Fields{
"endpoint": e.Endpoint,
"port": e.Port,
"targetPort": e.TargetPort,
})
if !standbyTargetCreationRequired {
e.initiator = initiator
e.dmDeviceIsBusy = dmDeviceIsBusy
e.Endpoint = initiator.GetEndpoint()
e.log = e.log.WithFields(logrus.Fields{
"endpoint": e.Endpoint,
"port": e.Port,
"targetPort": e.TargetPort,
})
}

Check warning on line 458 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L449-L458

Added lines #L449 - L458 were not covered by tests

e.log.Infof("Finished handling frontend for engine: %+v", e)
}
Expand Down Expand Up @@ -491,7 +507,11 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc
e.Port = port
}
if targetCreationRequired {
e.TargetPort = port
if standbyTargetCreationRequired {
e.StandbyTargetPort = port
} else {
e.TargetPort = port
}

Check warning on line 514 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L511-L514

Added lines #L511 - L514 were not covered by tests
}

if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil {
Expand Down Expand Up @@ -650,6 +670,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) {
Port: e.Port,
TargetIp: e.TargetIP,
TargetPort: e.TargetPort,
StandbyTargetPort: e.StandbyTargetPort,

Check warning on line 673 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L673

Added line #L673 was not covered by tests
Snapshots: map[string]*spdkrpc.Lvol{},
Frontend: e.Frontend,
Endpoint: e.Endpoint,
Expand Down Expand Up @@ -2270,7 +2291,9 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddres

if newTargetIP == podIP {
e.TargetPort = newTargetPort
e.StandbyTargetPort = 0

Check warning on line 2294 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L2294

Added line #L2294 was not covered by tests
} else {
e.StandbyTargetPort = e.TargetPort

Check warning on line 2296 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L2296

Added line #L2296 was not covered by tests
e.TargetPort = 0
}

Expand Down Expand Up @@ -2411,16 +2434,16 @@ func (e *Engine) connectTarget(targetAddress string) error {

// DeleteTarget deletes the target instance
func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) {
e.log.Infof("Deleting target with target port %d", e.TargetPort)
e.log.Infof("Deleting target with target port %d and standby target port %d", e.TargetPort, e.StandbyTargetPort)

Check warning on line 2437 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L2437

Added line #L2437 was not covered by tests

err = spdkClient.StopExposeBdev(e.Nqn)
if err != nil {
return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name)
}

err = e.releaseTargetPort(superiorPortAllocator)
err = e.releaseTargetAndStandbyTargetPorts(superiorPortAllocator)
if err != nil {
return errors.Wrapf(err, "failed to release target port while deleting target instance for engine %s", e.Name)
return errors.Wrapf(err, "failed to release target and standby target ports while deleting target instance for engine %s", e.Name)

Check warning on line 2446 in pkg/spdk/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/spdk/engine.go#L2446

Added line #L2446 was not covered by tests
}

e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name)
Expand All @@ -2446,8 +2469,9 @@ func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool
return oldTargetAddress != newTargetAddress
}

func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) error {
func (e *Engine) releaseTargetAndStandbyTargetPorts(superiorPortAllocator *commonbitmap.Bitmap) error {
releaseTargetPortRequired := e.TargetPort != 0
releaseStandbyTargetPortRequired := e.StandbyTargetPort != 0 && e.StandbyTargetPort != e.TargetPort

// Release the target port
if releaseTargetPortRequired {
Expand All @@ -2457,5 +2481,13 @@ func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) e
}
e.TargetPort = 0

// Release the standby target port
if releaseStandbyTargetPortRequired {
if err := superiorPortAllocator.ReleaseRange(e.StandbyTargetPort, e.StandbyTargetPort); err != nil {
return errors.Wrapf(err, "failed to release standby target port %d", e.StandbyTargetPort)
}
}
e.StandbyTargetPort = 0

return nil
}
Loading

0 comments on commit 8b80320

Please sign in to comment.