From 3c6c4a38a87372ac8131f606b9a1051df98c423e Mon Sep 17 00:00:00 2001 From: Derek Su Date: Mon, 18 Nov 2024 23:49:58 +0800 Subject: [PATCH] feat(v2 upgrade): support engine live upgrade Longhorn 9104 Signed-off-by: Derek Su --- pkg/api/types.go | 2 + pkg/spdk/engine.go | 322 ++++++++++++++++++++++++---------------- pkg/spdk/engine_test.go | 115 ++++++++++++++ 3 files changed, 307 insertions(+), 132 deletions(-) create mode 100644 pkg/spdk/engine_test.go diff --git a/pkg/api/types.go b/pkg/api/types.go index cb30a325..1f600307 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -129,6 +129,7 @@ type Engine struct { Port int32 `json:"port"` TargetIP string `json:"target_ip"` TargetPort int32 `json:"target_port"` + StandbyTargetPort int32 `json:"standby_target_port"` ReplicaAddressMap map[string]string `json:"replica_address_map"` ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"` Head *Lvol `json:"head"` @@ -149,6 +150,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine { Port: e.Port, TargetIP: e.TargetIp, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, ReplicaAddressMap: e.ReplicaAddressMap, ReplicaModeMap: map[string]types.Mode{}, Head: ProtoLvolToLvol(e.Head), diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 6c79c7ff..f52b5ca9 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -34,18 +34,19 @@ import ( type Engine struct { sync.RWMutex - Name string - VolumeName string - SpecSize uint64 - ActualSize uint64 - IP string - Port int32 - TargetIP string - TargetPort int32 - Frontend string - Endpoint string - Nqn string - Nguid string + Name string + VolumeName string + SpecSize uint64 + ActualSize uint64 + IP string + Port int32 // Port that initiator is connecting to + TargetIP string + TargetPort int32 // Port of the target that is used for letting initiator connect to + StandbyTargetPort int32 + Frontend string + Endpoint string + Nqn string + Nguid string ReplicaStatusMap map[string]*EngineReplicaStatus @@ -104,6 +105,42 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } +func isNewEngine(e *Engine) bool { + return e.IP == "" +} + +func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { + initiatorCreationRequired, targetCreationRequired := false, false + var err error + + if podIP == initiatorIP && podIP == targetIP { + if e.Port == 0 && e.TargetPort == 0 { + e.log.Info("Creating both initiator and target instances") + initiatorCreationRequired = true + targetCreationRequired = true + } else if e.Port != 0 && e.TargetPort == 0 { + e.log.Info("Creating a target instance") + if e.StandbyTargetPort != 0 { + e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort) + } else { + targetCreationRequired = true + } + } else { + err = fmt.Errorf("invalid initiator and target address for engine %s creation", e.Name) + } + } else if podIP == initiatorIP { + e.log.Info("Creating an initiator instance") + initiatorCreationRequired = true + } else if podIP == targetIP { + e.log.Info("Creating a target instance") + targetCreationRequired = true + } else { + err = fmt.Errorf("invalid initiator and target address for engine %s creation", e.Name) + } + + return initiatorCreationRequired, targetCreationRequired, err +} + func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, @@ -119,7 +156,6 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.Lock() defer func() { e.Unlock() - if requireUpdate { e.UpdateCh <- nil } @@ -134,6 +170,7 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str if err != nil { return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } + targetIP, _, err := splitHostPort(targetAddress) if err != nil { return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) @@ -168,29 +205,28 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str replicaBdevList := []string{} - initiatorCreationRequired := true - if !upgradeRequired { - if e.IP == "" { - if initiatorIP != targetIP { - // For creating target on another node - initiatorCreationRequired = false - e.log.Info("Creating an target engine") - e.TargetIP = podIP - } else { - // For newly creating engine - e.log.Info("Creating an new engine") - e.IP = podIP - e.TargetIP = podIP - } + initiatorCreationRequired, targetCreationRequired, err := e.checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP) + if err != nil { + return nil, err + } + if !initiatorCreationRequired && !targetCreationRequired { + return e.getWithoutLock(), nil + } - e.log = e.log.WithField("ip", e.IP) - } else { - if initiatorIP != targetIP { - return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) - } + if isNewEngine(e) { + e.IP = initiatorIP + e.TargetIP = targetIP + } - // For creating target on attached node - initiatorCreationRequired = false + e.log = e.log.WithFields(logrus.Fields{ + "initiatorIP": e.IP, + "targetIP": e.TargetIP, + }) + + if targetCreationRequired { + _, err := spdkClient.BdevRaidGet(e.Name, 0) + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to get raid bdev %v during engine creation", e.Name) } if salvageRequested { @@ -222,18 +258,15 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) + // TODO: improve the log message + e.log.Infof("Connecting all available replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } } else { - // For reconstructing engine after switching over target to another node - initiatorCreationRequired = false - - e.IP = targetIP + e.log.Info("Skipping target creation during engine creation") - // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKServiceAddress := net.JoinHostPort(e.TargetIP, strconv.Itoa(types.SPDKServicePort)) targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err @@ -244,21 +277,17 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - var engineWithTarget *api.Engine - if initiatorIP != targetIP { - engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) - if err != nil { - return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) - } - } else { - engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + e.log.Info("Fetching replica list from target engine") + targetEngine, err := targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) } for replicaName, replicaAddr := range replicaAddressMap { e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ Address: replicaAddr, } - if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + if _, ok := targetEngine.ReplicaAddressMap[replicaName]; !ok { e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { @@ -267,12 +296,21 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.ReplicaStatusMap[replicaName].BdevName = replicaName } } - e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) - e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) + + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log.Infof("Connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } - e.log.Info("Launching frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { + log := e.log.WithFields(logrus.Fields{ + "initiatorCreationRequired": initiatorCreationRequired, + "targetCreationRequired": targetCreationRequired, + "upgradeRequired": upgradeRequired, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }) + + log.Info("Handling frontend during engine creation") + if err := e.handleFrontend(spdkClient, superiorPortAllocator, portCount, targetAddress, initiatorCreationRequired, targetCreationRequired); err != nil { return nil, err } @@ -357,7 +395,8 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, + initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -367,9 +406,9 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - initiatorIP, _, err := splitHostPort(initiatorAddress) - if err != nil { - return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + standbyTargetCreationRequired := false + if e.Port != 0 && e.TargetPort == 0 { + standbyTargetCreationRequired = true } targetIP, targetPort, err := splitHostPort(targetAddress) @@ -378,34 +417,87 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, } e.Nqn = helpertypes.GetNQN(e.Name) + e.Nguid = commonutils.RandomID(nvmeNguidLength) - var port int32 - if !upgradeRequired { - e.Nguid = commonutils.RandomID(nvmeNguidLength) + dmDeviceBusy := false + port := int32(0) + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) + } - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + defer func() { + if err == nil { + if !standbyTargetCreationRequired { + e.initiator = initiator + e.dmDeviceBusy = dmDeviceBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + } + + e.log.Infof("Finished handling frontend for engine: %+v", e) } + }() - port, _, err = superiorPortAllocator.AllocateRange(portCount) - if err != nil { - return err + if initiatorCreationRequired && !targetCreationRequired { + initiator.TransportAddress = targetIP + initiator.TransportServiceID = strconv.Itoa(int(targetPort)) + + e.log.Infof("Target instance already exists on %v, no need to create target instance", targetAddress) + e.Port = targetPort + + // TODO: + // "nvme list -o json" might be empty devices for a while instance manager pod is just started. + // The root cause is not clear, so we need to retry to load NVMe device info. + for r := 0; r < maxNumRetries; r++ { + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil && strings.Contains(err.Error(), "failed to get devices") { + time.Sleep(retryInterval) + continue + } + if err == nil { + e.log.Infof("Loaded NVMe device info for engine") + break + } + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) } - e.log.Infof("Allocated port %v", port) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { - return err + err = initiator.LoadEndpoint(false) + if err != nil { + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) } - if initiatorCreationRequired { - e.Port = port - e.TargetPort = port + return nil + } + + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return errors.Wrapf(err, "failed to allocate port for engine %v", e.Name) + } + e.log.Infof("Allocated port %v for engine", port) + + if initiatorCreationRequired { + e.Port = port + } + if targetCreationRequired { + if standbyTargetCreationRequired { + e.StandbyTargetPort = port } else { e.TargetPort = port } - } else { - e.Port = targetPort + } + + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err } if e.Frontend == types.FrontendSPDKTCPNvmf { @@ -413,50 +505,17 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - if initiatorIP != targetIP && !upgradeRequired { - e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) + if !initiatorCreationRequired && targetCreationRequired { + e.log.Infof("Only creating target instance for engine, no need to start initiator") return nil } - initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) - if err != nil { - return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) - } + e.log.Info("Starting initiator for engine") - dmDeviceBusy := false - if initiatorCreationRequired { - e.log.Info("Starting initiator for engine") - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - e.log.Info("Loading NVMe device info for engine") - err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) - if err != nil { - if nvme.IsValidNvmeDeviceNotFound(err) { - dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) - if err != nil { - return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) - } - } else { - return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) - } - } - err = initiator.LoadEndpoint(false) - if err != nil { - return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) - } - //dmDeviceBusy = true + dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) + if err != nil { + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) } - e.initiator = initiator - e.dmDeviceBusy = dmDeviceBusy - e.Endpoint = initiator.GetEndpoint() - - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": port, - }) return nil } @@ -564,6 +623,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { Port: e.Port, TargetIp: e.TargetIP, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, Snapshots: map[string]*spdkrpc.Lvol{}, Frontend: e.Frontend, Endpoint: e.Endpoint, @@ -630,16 +690,12 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { } }() - podIP, err := commonnet.GetIPForPod() - if err != nil { - return err - } - if e.IP != podIP { - // Skip the validation if the engine is being upgraded - if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { - return nil - } - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) + // podIP, err := commonnet.GetIPForPod() + // if err != nil { + // return err + // } + if e.IP != e.TargetIP { + return nil } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -2122,6 +2178,11 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s e.UpdateCh <- nil }() + podIP, err := commonnet.GetIPForPod() + if err != nil { + return err + } + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) @@ -2164,6 +2225,11 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress s e.IP = targetIP e.Port = targetPort + if targetIP == podIP { + e.TargetPort = targetPort + e.StandbyTargetPort = 0 + } + e.log.Info("Reloading device mapper after target switchover") if err := e.reloadDevice(); err != nil { return err @@ -2333,11 +2399,3 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { return oldTargetAddress != newTargetAddress } - -func engineOnlyContainsInitiator(e *Engine) bool { - return e.Port != 0 && e.TargetPort == 0 -} - -func engineOnlyContainsTarget(e *Engine) bool { - return e.Port == 0 && e.TargetPort != 0 -} diff --git a/pkg/spdk/engine_test.go b/pkg/spdk/engine_test.go new file mode 100644 index 00000000..c9e303aa --- /dev/null +++ b/pkg/spdk/engine_test.go @@ -0,0 +1,115 @@ +package spdk + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + . "gopkg.in/check.v1" +) + +func (s *TestSuite) TestCheckInitiatorAndTargetCreationRequirements(c *C) { + testCases := []struct { + name string + podIP string + initiatorIP string + targetIP string + port int32 + targetPort int32 + standbyTargetPort int32 + expectedInitiatorCreationRequired bool + expectedTargetCreationRequired bool + expectedError error + }{ + { + name: "Create both initiator and target instances", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local target instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 8080, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local initiator instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + { + name: "Create remote target instance only", + podIP: "192.168.1.2", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Invalid initiator and target address", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.2", + targetIP: "192.168.1.3", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: fmt.Errorf("invalid initiator and target address for engine %s creation", "test-engine"), + }, + { + name: "Standby target instance is already created", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 100, + targetPort: 0, + standbyTargetPort: 105, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing TestCheckInitiatorAndTargetCreationRequirements.%v", testName) + + engine := &Engine{ + Port: testCase.port, + TargetPort: testCase.targetPort, + StandbyTargetPort: testCase.standbyTargetPort, + Name: "test-engine", + log: logrus.New(), + } + + initiatorCreationRequired, targetCreationRequired, err := engine.checkInitiatorAndTargetCreationRequirements(testCase.podIP, testCase.initiatorIP, testCase.targetIP) + + c.Assert(initiatorCreationRequired, Equals, testCase.expectedInitiatorCreationRequired) + c.Assert(targetCreationRequired, Equals, testCase.expectedTargetCreationRequired) + c.Assert(err, DeepEquals, testCase.expectedError) + } +}