diff --git a/pkg/api/types.go b/pkg/api/types.go index cb30a325..f5ed96cd 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1,9 +1,10 @@ package api import ( - "github.com/longhorn/types/pkg/generated/spdkrpc" "google.golang.org/protobuf/types/known/emptypb" + "github.com/longhorn/types/pkg/generated/spdkrpc" + "github.com/longhorn/longhorn-spdk-engine/pkg/types" ) @@ -129,6 +130,7 @@ type Engine struct { Port int32 `json:"port"` TargetIP string `json:"target_ip"` TargetPort int32 `json:"target_port"` + StandbyTargetPort int32 `json:"standby_target_port"` ReplicaAddressMap map[string]string `json:"replica_address_map"` ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"` Head *Lvol `json:"head"` @@ -149,6 +151,7 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine { Port: e.Port, TargetIP: e.TargetIp, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, ReplicaAddressMap: e.ReplicaAddressMap, ReplicaModeMap: map[string]types.Mode{}, Head: ProtoLvolToLvol(e.Head), diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 65a9119f..fc7962e3 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -37,18 +37,19 @@ import ( type Engine struct { sync.RWMutex - Name string - VolumeName string - SpecSize uint64 - ActualSize uint64 - IP string - Port int32 // Port that initiator is connecting to - TargetIP string - TargetPort int32 // Port of the target that is used for letting initiator connect to - Frontend string - Endpoint string - Nqn string - Nguid string + Name string + VolumeName string + SpecSize uint64 + ActualSize uint64 + IP string + Port int32 // Port that initiator is connecting to + TargetIP string + TargetPort int32 // Port of the target that is used for letting initiator connect to + StandbyTargetPort int32 + Frontend string + Endpoint string + Nqn string + Nguid string ctrlrLossTimeout int fastIOFailTimeoutSec int @@ -115,7 +116,7 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } func (e *Engine) isNewEngine() bool { - return e.IP == "" && e.TargetIP == "" + return e.IP == "" && e.TargetIP == "" && e.StandbyTargetPort == 0 } func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, targetIP string) (bool, bool, error) { @@ -132,7 +133,11 @@ func (e *Engine) checkInitiatorAndTargetCreationRequirements(podIP, initiatorIP, } else if e.Port != 0 && e.TargetPort == 0 { // Only target instance creation is required, because the initiator instance is already running e.log.Info("Creating a target instance") - targetCreationRequired = true + if e.StandbyTargetPort != 0 { + e.log.Warnf("Standby target instance with port %v is already created, will skip the target creation", e.StandbyTargetPort) + } else { + targetCreationRequired = true + } } else { e.log.Infof("Initiator instance with port %v and target instance with port %v are already created, will skip the creation", e.Port, e.TargetPort) } @@ -404,6 +409,13 @@ func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (m return filteredCandidates, nil } +func (e *Engine) isStandbyTargetCreationRequired() bool { + // e.Port is non-zero which means the initiator instance is already created and connected to a target instance. + // e.TargetPort is zero which means the target instance is not created on the same pod. + // Thus, a standby target instance should be created for the target instance switch-over. + return e.Port != 0 && e.TargetPort == 0 +} + func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap, portCount int32, targetAddress string, initiatorCreationRequired, targetCreationRequired bool) (err error) { if !types.IsFrontendSupported(e.Frontend) { @@ -415,6 +427,8 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc return nil } + standbyTargetCreationRequired := e.isStandbyTargetCreationRequired() + targetIP, targetPort, err := splitHostPort(targetAddress) if err != nil { return err @@ -432,14 +446,16 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc defer func() { if err == nil { - e.initiator = initiator - e.dmDeviceIsBusy = dmDeviceIsBusy - e.Endpoint = initiator.GetEndpoint() - e.log = e.log.WithFields(logrus.Fields{ - "endpoint": e.Endpoint, - "port": e.Port, - "targetPort": e.TargetPort, - }) + if !standbyTargetCreationRequired { + e.initiator = initiator + e.dmDeviceIsBusy = dmDeviceIsBusy + e.Endpoint = initiator.GetEndpoint() + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": e.Port, + "targetPort": e.TargetPort, + }) + } e.log.Infof("Finished handling frontend for engine: %+v", e) } @@ -491,7 +507,11 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, superiorPortAlloc e.Port = port } if targetCreationRequired { - e.TargetPort = port + if standbyTargetCreationRequired { + e.StandbyTargetPort = port + } else { + e.TargetPort = port + } } if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { @@ -650,6 +670,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { Port: e.Port, TargetIp: e.TargetIP, TargetPort: e.TargetPort, + StandbyTargetPort: e.StandbyTargetPort, Snapshots: map[string]*spdkrpc.Lvol{}, Frontend: e.Frontend, Endpoint: e.Endpoint, @@ -2270,7 +2291,9 @@ func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, newTargetAddres if newTargetIP == podIP { e.TargetPort = newTargetPort + e.StandbyTargetPort = 0 } else { + e.StandbyTargetPort = e.TargetPort e.TargetPort = 0 } @@ -2411,16 +2434,16 @@ func (e *Engine) connectTarget(targetAddress string) error { // DeleteTarget deletes the target instance func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *commonbitmap.Bitmap) (err error) { - e.log.Infof("Deleting target with target port %d", e.TargetPort) + e.log.Infof("Deleting target with target port %d and standby target port %d", e.TargetPort, e.StandbyTargetPort) err = spdkClient.StopExposeBdev(e.Nqn) if err != nil { return errors.Wrapf(err, "failed to stop expose bdev while deleting target instance for engine %s", e.Name) } - err = e.releaseTargetPort(superiorPortAllocator) + err = e.releaseTargetAndStandbyTargetPorts(superiorPortAllocator) if err != nil { - return errors.Wrapf(err, "failed to release target port while deleting target instance for engine %s", e.Name) + return errors.Wrapf(err, "failed to release target and standby target ports while deleting target instance for engine %s", e.Name) } e.log.Infof("Deleting raid bdev %s while deleting target instance", e.Name) @@ -2446,8 +2469,9 @@ func isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool return oldTargetAddress != newTargetAddress } -func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) error { +func (e *Engine) releaseTargetAndStandbyTargetPorts(superiorPortAllocator *commonbitmap.Bitmap) error { releaseTargetPortRequired := e.TargetPort != 0 + releaseStandbyTargetPortRequired := e.StandbyTargetPort != 0 && e.StandbyTargetPort != e.TargetPort // Release the target port if releaseTargetPortRequired { @@ -2457,5 +2481,13 @@ func (e *Engine) releaseTargetPort(superiorPortAllocator *commonbitmap.Bitmap) e } e.TargetPort = 0 + // Release the standby target port + if releaseStandbyTargetPortRequired { + if err := superiorPortAllocator.ReleaseRange(e.StandbyTargetPort, e.StandbyTargetPort); err != nil { + return errors.Wrapf(err, "failed to release standby target port %d", e.StandbyTargetPort) + } + } + e.StandbyTargetPort = 0 + return nil } diff --git a/pkg/spdk/engine_test.go b/pkg/spdk/engine_test.go new file mode 100644 index 00000000..4780430f --- /dev/null +++ b/pkg/spdk/engine_test.go @@ -0,0 +1,233 @@ +package spdk + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + commonbitmap "github.com/longhorn/go-common-libs/bitmap" + + . "gopkg.in/check.v1" +) + +func (s *TestSuite) TestCheckInitiatorAndTargetCreationRequirements(c *C) { + testCases := []struct { + name string + podIP string + initiatorIP string + targetIP string + port int32 + targetPort int32 + standbyTargetPort int32 + expectedInitiatorCreationRequired bool + expectedTargetCreationRequired bool + expectedError error + }{ + { + name: "Create both initiator and target instances", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local target instance on the node with initiator instance", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 8080, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Create local initiator instance only", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: true, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + { + name: "Create local target instance on the node without initiator instance", + podIP: "192.168.1.2", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.2", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: true, + expectedError: nil, + }, + { + name: "Invalid initiator and target addresses", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.2", + targetIP: "192.168.1.3", + port: 0, + targetPort: 0, + standbyTargetPort: 0, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: fmt.Errorf("invalid initiator and target addresses for engine test-engine creation with initiator address 192.168.1.2 and target address 192.168.1.3"), + }, + { + name: "Standby target instance is already created", + podIP: "192.168.1.1", + initiatorIP: "192.168.1.1", + targetIP: "192.168.1.1", + port: 100, + targetPort: 0, + standbyTargetPort: 105, + expectedInitiatorCreationRequired: false, + expectedTargetCreationRequired: false, + expectedError: nil, + }, + } + for testName, testCase := range testCases { + c.Logf("testing checkInitiatorAndTargetCreationRequirements.%v", testName) + + engine := &Engine{ + Port: testCase.port, + TargetPort: testCase.targetPort, + StandbyTargetPort: testCase.standbyTargetPort, + Name: "test-engine", + log: logrus.New(), + } + + initiatorCreationRequired, targetCreationRequired, err := engine.checkInitiatorAndTargetCreationRequirements(testCase.podIP, testCase.initiatorIP, testCase.targetIP) + + c.Assert(initiatorCreationRequired, Equals, testCase.expectedInitiatorCreationRequired, + Commentf("Test case '%s': unexpected initiator creation requirement", testCase.name)) + c.Assert(targetCreationRequired, Equals, testCase.expectedTargetCreationRequired, + Commentf("Test case '%s': unexpected target creation requirement", testCase.name)) + c.Assert(err, DeepEquals, testCase.expectedError, + Commentf("Test case '%s': unexpected error result", testCase.name)) + } +} + +func (s *TestSuite) TestIsNewEngine(c *C) { + testCases := []struct { + name string + engine *Engine + expected bool + }{ + { + name: "New engine with empty IP and TargetIP and StandbyTargetPort 0", + engine: &Engine{ + IP: "", + TargetIP: "", + StandbyTargetPort: 0, + }, + expected: true, + }, + { + name: "Engine with non-empty IP", + engine: &Engine{ + IP: "192.168.1.1", + TargetIP: "", + StandbyTargetPort: 0, + }, + expected: false, + }, + { + name: "Engine with non-empty TargetIP", + engine: &Engine{ + IP: "", + TargetIP: "192.168.1.2", + StandbyTargetPort: 0, + }, + expected: false, + }, + { + name: "Engine with non-zero StandbyTargetPort", + engine: &Engine{ + IP: "", + TargetIP: "", + StandbyTargetPort: 8080, + }, + expected: false, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing isNewEngine.%v", testName) + result := testCase.engine.isNewEngine() + c.Assert(result, Equals, testCase.expected, Commentf("Test case '%s': unexpected result", testCase.name)) + } +} + +func (s *TestSuite) TestReleaseTargetAndStandbyTargetPorts(c *C) { + testCases := []struct { + name string + engine *Engine + expectedTargetPort int32 + expectedStandbyTargetPort int32 + expectedError error + }{ + { + name: "Release both target and standby target ports", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 2005, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release target port only but standby target port is not set", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 0, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release target and standby ports when they are the same", + engine: &Engine{ + TargetPort: 2000, + StandbyTargetPort: 2000, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + { + name: "Release snapshot target port only", + engine: &Engine{ + TargetPort: 0, + StandbyTargetPort: 2000, + }, + expectedTargetPort: 0, + expectedStandbyTargetPort: 0, + expectedError: nil, + }, + } + + for testName, testCase := range testCases { + c.Logf("testing releaseTargetAndStandbyTargetPorts.%v", testName) + + bitmap, err := commonbitmap.NewBitmap(0, 100000) + c.Assert(err, IsNil) + + err = testCase.engine.releaseTargetAndStandbyTargetPorts(bitmap) + c.Assert(err, DeepEquals, testCase.expectedError, Commentf("Test case '%s': unexpected error result", testCase.name)) + c.Assert(testCase.engine.TargetPort, Equals, testCase.expectedTargetPort, Commentf("Test case '%s': unexpected target port", testCase.name)) + c.Assert(testCase.engine.StandbyTargetPort, Equals, testCase.expectedStandbyTargetPort, Commentf("Test case '%s': unexpected standby target port", testCase.name)) + } +}