Skip to content

Commit

Permalink
feat(v2 upgrade): support engine live upgrade
Browse files Browse the repository at this point in the history
Longhorn 9104

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Dec 11, 2024
1 parent 7eddcf6 commit 79de939
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 28 deletions.
5 changes: 4 additions & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package api

import (
"github.com/longhorn/types/pkg/generated/spdkrpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/longhorn/types/pkg/generated/spdkrpc"

"github.com/longhorn/longhorn-spdk-engine/pkg/types"
)

Expand Down Expand Up @@ -129,6 +130,7 @@ type Engine struct {
Port int32 `json:"port"`
TargetIP string `json:"target_ip"`
TargetPort int32 `json:"target_port"`
StandbyTargetPort int32 `json:"standby_target_port"`
ReplicaAddressMap map[string]string `json:"replica_address_map"`
ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"`
Head *Lvol `json:"head"`
Expand All @@ -149,6 +151,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine {
Port: e.Port,
TargetIP: e.TargetIp,
TargetPort: e.TargetPort,
StandbyTargetPort: e.StandbyTargetPort,
ReplicaAddressMap: e.ReplicaAddressMap,
ReplicaModeMap: map[string]types.Mode{},
Head: ProtoLvolToLvol(e.Head),
Expand Down
86 changes: 59 additions & 27 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ import (
type Engine struct {
sync.RWMutex

Name string
VolumeName string
SpecSize uint64
ActualSize uint64
IP string
Port int32 // Port that initiator is connecting to
TargetIP string
TargetPort int32 // Port of the target that is used for letting initiator connect to
Frontend string
Endpoint string
Nqn string
Nguid string
Name string
VolumeName string
SpecSize uint64
ActualSize uint64
IP string
Port int32 // Port that initiator is connecting to
TargetIP string
TargetPort int32 // Port of the target that is used for letting initiator connect to
StandbyTargetPort int32
Frontend string
Endpoint string
Nqn string
Nguid string

ctrlrLossTimeout int
fastIOFailTimeoutSec int
Expand Down Expand Up @@ -115,7 +116,7 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU
}

func (e *Engine) isNewEngine() bool {
return e.IP == "" && e.TargetIP == ""
return e.IP == "" && e.TargetIP == "" && e.StandbyTargetPort == 0
}

func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) {
Expand All @@ -132,7 +133,11 @@ func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP,
} else if e.Port != 0 && e.TargetPort == 0 {
// Only target instance creation is required, because the initiator instance is already running
e.log.Info("Creating a target instance")
targetCreationRequired = true
if e.StandbyTargetPort != 0 {
e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort)
} else {
targetCreationRequired = true
}
} else {
e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort)
}
Expand Down Expand Up @@ -404,6 +409,13 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m
return filteredCandidates, nil
}

func (e *Engine) isStandbyTargetCreationRequired() bool {
// e.Port is non-zero which means the initiator instance is already created and connected to a target instance.
// e.TargetPort is zero which means the target instance is not created on the same pod.
// Thus, a standby target instance should be created for the target instance switch-over.
return e.Port != 0 && e.TargetPort == 0
}

func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string,
initiatorCreationRequired, targetCreationRequired bool) (err error) {
if !types.IsFrontendSupported(e.Frontend) {
Expand All @@ -415,6 +427,8 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc
return nil
}

standbyTargetCreationRequired := e.isStandbyTargetCreationRequired()

targetIP, targetPort, err := splitHostPort(targetAddress)
if err != nil {
return err
Expand All @@ -432,14 +446,16 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc

defer func() {
if err == nil {
e.initiator = initiator
e.dmDeviceIsBusy = dmDeviceIsBusy
e.Endpoint = initiator.GetEndpoint()
e.log = e.log.WithFields(logrus.Fields{
"endpoint": e.Endpoint,
"port": e.Port,
"targetPort": e.TargetPort,
})
if !standbyTargetCreationRequired {
e.initiator = initiator
e.dmDeviceIsBusy = dmDeviceIsBusy
e.Endpoint = initiator.GetEndpoint()
e.log = e.log.WithFields(logrus.Fields{
"endpoint": e.Endpoint,
"port": e.Port,
"targetPort": e.TargetPort,
})
}

e.log.Infof("Finished handling frontend for engine: %+v", e)
}
Expand Down Expand Up @@ -491,7 +507,11 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc
e.Port = port
}
if targetCreationRequired {
e.TargetPort = port
if standbyTargetCreationRequired {
e.StandbyTargetPort = port
} else {
e.TargetPort = port
}
}

if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil {
Expand Down Expand Up @@ -650,6 +670,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) {
Port: e.Port,
TargetIp: e.TargetIP,
TargetPort: e.TargetPort,
StandbyTargetPort: e.StandbyTargetPort,
Snapshots: map[string]*spdkrpc.Lvol{},
Frontend: e.Frontend,
Endpoint: e.Endpoint,
Expand Down Expand Up @@ -2270,7 +2291,9 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddres

if newTargetIP == podIP {
e.TargetPort = newTargetPort
e.StandbyTargetPort = 0
} else {
e.StandbyTargetPort = e.TargetPort
e.TargetPort = 0
}

Expand Down Expand Up @@ -2411,16 +2434,16 @@ func (e *Engine) connectTarget(targetAddress string) error {

// DeleteTarget deletes the target instance
func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) {
e.log.Infof("Deleting target with target port %d", e.TargetPort)
e.log.Infof("Deleting target with target port %d and standby target port %d", e.TargetPort, e.StandbyTargetPort)

err = spdkClient.StopExposeBdev(e.Nqn)
if err != nil {
return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name)
}

err = e.releaseTargetPort(superiorPortAllocator)
err = e.releaseTargetAndStandbyTargetPorts(superiorPortAllocator)
if err != nil {
return errors.Wrapf(err, "failed to release target port while deleting target instance for engine %s", e.Name)
return errors.Wrapf(err, "failed to release target and standby target ports while deleting target instance for engine %s", e.Name)
}

e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name)
Expand All @@ -2446,8 +2469,9 @@ func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool
return oldTargetAddress != newTargetAddress
}

func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) error {
func (e *Engine) releaseTargetAndStandbyTargetPorts(superiorPortAllocator *commonbitmap.Bitmap) error {
releaseTargetPortRequired := e.TargetPort != 0
releaseStandbyTargetPortRequired := e.StandbyTargetPort != 0 && e.StandbyTargetPort != e.TargetPort

// Release the target port
if releaseTargetPortRequired {
Expand All @@ -2457,5 +2481,13 @@ func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) e
}
e.TargetPort = 0

// Release the standby target port
if releaseStandbyTargetPortRequired {
if err := superiorPortAllocator.ReleaseRange(e.StandbyTargetPort, e.StandbyTargetPort); err != nil {
return errors.Wrapf(err, "failed to release standby target port %d", e.StandbyTargetPort)
}
}
e.StandbyTargetPort = 0

return nil
}
Loading

0 comments on commit 79de939

Please sign in to comment.