Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support v2 volume live migration #3323

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,270 +279,270 @@
return log
}

func (c *VolumeController) syncVolume(key string) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to sync %v", key)
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if namespace != c.namespace {
// Not ours, don't do anything
return nil
}

volume, err := c.ds.GetVolume(name)
if err != nil {
if datastore.ErrorIsNotFound(err) {
return nil
}
return err
}

log := getLoggerForVolume(c.logger, volume)

defaultEngineImage, err := c.ds.GetSettingValueExisted(types.SettingNameDefaultEngineImage)
if err != nil {
return err
}
isResponsible, err := c.isResponsibleFor(volume, defaultEngineImage)
if err != nil {
return err
}
if !isResponsible {
return nil
}

if volume.Status.OwnerID != c.controllerID {
volume.Status.OwnerID = c.controllerID
volume, err = c.ds.UpdateVolumeStatus(volume)
if err != nil {
// we don't mind others coming first
if apierrors.IsConflict(errors.Cause(err)) {
return nil
}
return err
}
log.Infof("Volume got new owner %v", c.controllerID)
}

engines, err := c.ds.ListVolumeEngines(volume.Name)
if err != nil {
return err
}
replicas, err := c.ds.ListVolumeReplicas(volume.Name)
if err != nil {
return err
}
snapshots, err := c.ds.ListVolumeSnapshotsRO(volume.Name)
if err != nil {
return err
}

if volume.DeletionTimestamp != nil {
if volume.Status.State != longhorn.VolumeStateDeleting {
volume.Status.State = longhorn.VolumeStateDeleting
volume, err = c.ds.UpdateVolumeStatus(volume)
if err != nil {
return err
}
c.eventRecorder.Eventf(volume, corev1.EventTypeNormal, constant.EventReasonDelete, "Deleting volume %v", volume.Name)
}

if volume.Spec.AccessMode == longhorn.AccessModeReadWriteMany {
log.Info("Removing share manager for deleted volume")
if err := c.ds.DeleteShareManager(volume.Name); err != nil && !datastore.ErrorIsNotFound(err) {
return err
}
}

for _, snap := range snapshots {
if snap.DeletionTimestamp == nil {
if err := c.ds.DeleteSnapshot(snap.Name); err != nil {
return err
}
}
}
for _, e := range engines {
if e.DeletionTimestamp == nil {
if err := c.ds.DeleteEngine(e.Name); err != nil {
return err
}
}
}
if types.IsDataEngineV2(volume.Spec.DataEngine) {
// To prevent from the "no such device" error in spdk_tgt,
// remove the raid bdev before tearing down the replicas.
engines, err := c.ds.ListVolumeEnginesRO(volume.Name)
if err != nil {
return err
}
if len(engines) > 0 {
replicaDeleted := false
for _, r := range replicas {
if r.DeletionTimestamp != nil {
replicaDeleted = true
break
}
}
if !replicaDeleted {
c.logger.Infof("Volume (%s) %v still has engines %v, so skip deleting its replicas until the engines have been deleted",
volume.Spec.DataEngine, volume.Name, engines)
return nil
}
}
}

for _, r := range replicas {
if r.DeletionTimestamp == nil {
if err := c.ds.DeleteReplica(r.Name); err != nil {
return err
}
}
}

kubeStatus := volume.Status.KubernetesStatus

if kubeStatus.PVName != "" {
if err := c.ds.DeletePersistentVolume(kubeStatus.PVName); err != nil {
if !datastore.ErrorIsNotFound(err) {
return err
}
}
}

if kubeStatus.PVCName != "" && kubeStatus.LastPVCRefAt == "" {
if err := c.ds.DeletePersistentVolumeClaim(kubeStatus.Namespace, kubeStatus.PVCName); err != nil {
if !datastore.ErrorIsNotFound(err) {
return err
}
}
}
vaName := types.GetLHVolumeAttachmentNameFromVolumeName(volume.Name)
if err := c.ds.DeleteLHVolumeAttachment(vaName); err != nil && !apierrors.IsNotFound(err) {
return err
}

// now volumeattachment, snapshots, replicas, and engines have been marked for deletion
if engines, err := c.ds.ListVolumeEnginesRO(volume.Name); err != nil {
return err
} else if len(engines) > 0 {
return nil
}
if replicas, err := c.ds.ListVolumeReplicasRO(volume.Name); err != nil {
return err
} else if len(replicas) > 0 {
return nil
}

// now snapshots, replicas, and engines are deleted
return c.ds.RemoveFinalizerForVolume(volume)
}

existingVolume := volume.DeepCopy()
existingEngines := map[string]*longhorn.Engine{}
for k, e := range engines {
existingEngines[k] = e.DeepCopy()
}
existingReplicas := map[string]*longhorn.Replica{}
for k, r := range replicas {
existingReplicas[k] = r.DeepCopy()
}
defer func() {
var lastErr error
// create/delete engine/replica has been handled already
// so we only need to worry about entries in the current list
for k, r := range replicas {
if existingReplicas[k] == nil ||
!reflect.DeepEqual(existingReplicas[k].Spec, r.Spec) {
if _, err := c.ds.UpdateReplica(r); err != nil {
lastErr = err
}
}
}
// stop updating if replicas weren't fully updated
if lastErr == nil {
for k, e := range engines {
if existingEngines[k] == nil ||
!reflect.DeepEqual(existingEngines[k].Spec, e.Spec) {
if _, err := c.ds.UpdateEngine(e); err != nil {
lastErr = err
}
}
}
}
// stop updating if engines and replicas weren't fully updated
if lastErr == nil {
// Make sure that we don't update condition's LastTransitionTime if the condition's values hasn't changed
handleConditionLastTransitionTime(&existingVolume.Status, &volume.Status)
if !reflect.DeepEqual(existingVolume.Status, volume.Status) {
_, lastErr = c.ds.UpdateVolumeStatus(volume)
}
}
if err == nil {
err = lastErr
}
// requeue if it's conflict
if apierrors.IsConflict(errors.Cause(err)) {
log.Debugf("Requeue volume due to error %v", err)
c.enqueueVolume(volume)
err = nil
}
}()

if err := c.handleVolumeAttachmentCreation(volume); err != nil {
return err
}

if err := c.ReconcileEngineReplicaState(volume, engines, replicas); err != nil {
return err
}

if err := c.syncVolumeUnmapMarkSnapChainRemovedSetting(volume, engines, replicas); err != nil {
return err
}

if err := c.syncVolumeSnapshotSetting(volume, engines, replicas); err != nil {
return err
}

if err := c.updateRecurringJobs(volume); err != nil {
return err
}

if err := c.upgradeEngineForVolume(volume, engines, replicas); err != nil {
return err
}

if err := c.processMigration(volume, engines, replicas); err != nil {
return err
}

if err := c.ReconcilePersistentVolume(volume); err != nil {
return err
}

if err := c.ReconcileShareManagerState(volume); err != nil {
return err
}

if err := c.ReconcileBackupVolumeState(volume); err != nil {
return nil
}

if err := c.ReconcileVolumeState(volume, engines, replicas); err != nil {
return err
}

if err := c.cleanupReplicas(volume, engines, replicas); err != nil {
return err
}

return nil
}

// handleConditionLastTransitionTime rollback to the existing condition object if condition's values hasn't changed

Check warning on line 545 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L282-L545

Very Complex Method
func handleConditionLastTransitionTime(existingStatus, newStatus *longhorn.VolumeStatus) {
for i, newCondition := range newStatus.Conditions {
for _, existingCondition := range existingStatus.Conditions {
Expand Down Expand Up @@ -613,247 +613,247 @@

// ReconcileEngineReplicaState will get the current main engine e.Status.ReplicaModeMap, e.Status.RestoreStatus,
// e.Status.purgeStatus, and e.Status.SnapshotCloneStatus then update v and rs accordingly.
func (c *VolumeController) ReconcileEngineReplicaState(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to reconcile engine/replica state for %v", v.Name)
if v.Status.Robustness != longhorn.VolumeRobustnessDegraded {
v.Status.LastDegradedAt = ""
}
}()

// Aggregate replica wait for backing image condition
aggregatedReplicaWaitForBackingImageError := util.NewMultiError()
waitForBackingImage := false
for _, r := range rs {
waitForBackingImageCondition := types.GetCondition(r.Status.Conditions, longhorn.ReplicaConditionTypeWaitForBackingImage)
if waitForBackingImageCondition.Status == longhorn.ConditionStatusTrue {
waitForBackingImage = true
if waitForBackingImageCondition.Reason == longhorn.ReplicaConditionReasonWaitForBackingImageFailed {
aggregatedReplicaWaitForBackingImageError.Append(util.NewMultiError(waitForBackingImageCondition.Message))
}
}
}
if waitForBackingImage {
if len(aggregatedReplicaWaitForBackingImageError) > 0 {
failureMessage := aggregatedReplicaWaitForBackingImageError.Join()
v.Status.Conditions = types.SetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeWaitForBackingImage,
longhorn.ConditionStatusTrue, longhorn.VolumeConditionReasonWaitForBackingImageFailed, failureMessage)
} else {
v.Status.Conditions = types.SetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeWaitForBackingImage,
longhorn.ConditionStatusTrue, longhorn.VolumeConditionReasonWaitForBackingImageWaiting, "")
}
} else {
v.Status.Conditions = types.SetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeWaitForBackingImage,
longhorn.ConditionStatusFalse, "", "")
}

e, err := c.ds.PickVolumeCurrentEngine(v, es)
if err != nil {
return err
}
if e == nil {
return nil
}

log := getLoggerForVolume(c.logger, v).WithField("currentEngine", e.Name)

if e.Status.CurrentState == longhorn.InstanceStateUnknown {
if v.Status.Robustness != longhorn.VolumeRobustnessUnknown {
v.Status.Robustness = longhorn.VolumeRobustnessUnknown
c.eventRecorder.Eventf(v, corev1.EventTypeWarning, constant.EventReasonUnknown, "volume %v robustness is unknown", v.Name)
}
return nil
}
if e.Status.CurrentState != longhorn.InstanceStateRunning {
// If a replica failed at attaching stage before engine become running,
// there is no record in e.Status.ReplicaModeMap
engineInstanceCreationCondition := types.GetCondition(e.Status.Conditions, longhorn.InstanceConditionTypeInstanceCreation)
isNoAvailableBackend := strings.Contains(engineInstanceCreationCondition.Message, fmt.Sprintf("exit status %v", int(syscall.ENODATA)))
for _, r := range rs {
if isNoAvailableBackend || (r.Spec.FailedAt == "" && r.Status.CurrentState == longhorn.InstanceStateError) {
log.Warnf("Replica %v that not in the engine mode map is marked as failed, current state %v, engine name %v, active %v, no available backend %v",
r.Name, r.Status.CurrentState, r.Spec.EngineName, r.Spec.Active, isNoAvailableBackend)
e.Spec.LogRequested = true
r.Spec.LogRequested = true
setReplicaFailedAt(r, c.nowHandler())
r.Spec.DesireState = longhorn.InstanceStateStopped
}
}
return nil
}

// wait for monitoring to start
if e.Status.ReplicaModeMap == nil {
return nil
}

replicaList := []*longhorn.Replica{}
for _, r := range rs {
replicaList = append(replicaList, r)
}

restoreStatusMap := map[string]*longhorn.RestoreStatus{}
for addr, status := range e.Status.RestoreStatus {
rName := datastore.ReplicaAddressToReplicaName(addr, replicaList)
if _, exists := rs[rName]; exists {
restoreStatusMap[rName] = status
}
}

purgeStatusMap := map[string]*longhorn.PurgeStatus{}
for addr, status := range e.Status.PurgeStatus {
rName := datastore.ReplicaAddressToReplicaName(addr, replicaList)
if _, exists := rs[rName]; exists {
purgeStatusMap[rName] = status
}
}

// 1. remove ERR replicas
// 2. count RW replicas
healthyCount := 0
for rName, mode := range e.Status.ReplicaModeMap {
r := rs[rName]
if r == nil {
continue
}
restoreStatus := restoreStatusMap[rName]
purgeStatus := purgeStatusMap[rName]
if mode == longhorn.ReplicaModeERR ||
(restoreStatus != nil && restoreStatus.Error != "") ||
(purgeStatus != nil && purgeStatus.Error != "") {
if restoreStatus != nil && restoreStatus.Error != "" {
c.eventRecorder.Eventf(v, corev1.EventTypeWarning, constant.EventReasonFailedRestore, "replica %v failed the restore: %s", r.Name, restoreStatus.Error)
}
if purgeStatus != nil && purgeStatus.Error != "" {
c.eventRecorder.Eventf(v, corev1.EventTypeWarning, constant.EventReasonFailedSnapshotPurge, "replica %v failed the snapshot purge: %s", r.Name, purgeStatus.Error)
}
if r.Spec.FailedAt == "" {
log.Warnf("Replica %v is marked as failed, current state %v, mode %v, engine name %v, active %v", r.Name, r.Status.CurrentState, mode, r.Spec.EngineName, r.Spec.Active)
setReplicaFailedAt(r, c.nowHandler())
e.Spec.LogRequested = true
r.Spec.LogRequested = true
}
r.Spec.DesireState = longhorn.InstanceStateStopped
} else if mode == longhorn.ReplicaModeRW {
now := c.nowHandler()
if r.Spec.HealthyAt == "" {
c.backoff.DeleteEntry(r.Name)
// Set HealthyAt to distinguish this replica from one that has never been rebuilt.
r.Spec.HealthyAt = now
r.Spec.RebuildRetryCount = 0
}
// Set LastHealthyAt to record the last time this replica became RW in an engine.
if transitionTime, ok := e.Status.ReplicaTransitionTimeMap[rName]; !ok {
log.Errorf("BUG: Replica %v is in mode %v but transition time was not recorded", r.Name, mode)
r.Spec.LastHealthyAt = now
} else {
after, err := util.TimestampAfterTimestamp(transitionTime, r.Spec.LastHealthyAt)
if err != nil {
log.WithError(err).Warnf("Failed to check if replica %v transitioned to mode %v after it was last healthy", r.Name, mode)
}
if after || err != nil {
r.Spec.LastHealthyAt = now
}
}
healthyCount++
}
}
// If a replica failed at attaching/migrating stage,
// there is no record in e.Status.ReplicaModeMap
for _, r := range rs {
if r.Spec.FailedAt == "" && r.Status.CurrentState == longhorn.InstanceStateError {
log.Warnf("Replica %v that not in the engine mode map is marked as failed, current state %v, engine name %v, active %v",
r.Name, r.Status.CurrentState, r.Spec.EngineName, r.Spec.Active)
e.Spec.LogRequested = true
r.Spec.LogRequested = true
setReplicaFailedAt(r, c.nowHandler())
r.Spec.DesireState = longhorn.InstanceStateStopped
}
}

// Cannot continue evicting or replenishing replicas during engine migration.
isMigratingDone := !util.IsVolumeMigrating(v) && len(es) == 1

oldRobustness := v.Status.Robustness
if healthyCount == 0 { // no healthy replica exists, going to faulted
// ReconcileVolumeState() will deal with the faulted case
return nil
} else if healthyCount >= v.Spec.NumberOfReplicas {
v.Status.Robustness = longhorn.VolumeRobustnessHealthy
if oldRobustness == longhorn.VolumeRobustnessDegraded {
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonHealthy, "volume %v became healthy", v.Name)
}

if isMigratingDone {
// Evict replicas for the volume
if err := c.EvictReplicas(v, e, rs, healthyCount); err != nil {
return err
}

// Migrate local replica when Data Locality is on
// We turn off data locality while doing auto-attaching or restoring (e.g. frontend is disabled)
if v.Status.State == longhorn.VolumeStateAttached && !v.Status.FrontendDisabled &&
isDataLocalityBestEffort(v) && !hasLocalReplicaOnSameNodeAsEngine(e, rs) {
if err := c.replenishReplicas(v, e, rs, e.Spec.NodeID); err != nil {
return err
}
}

setting := c.ds.GetAutoBalancedReplicasSetting(v, log)
if setting != longhorn.ReplicaAutoBalanceDisabled {
if err := c.replenishReplicas(v, e, rs, ""); err != nil {
return err
}
}
}
} else { // healthyCount < v.Spec.NumberOfReplicas
v.Status.Robustness = longhorn.VolumeRobustnessDegraded
if oldRobustness != longhorn.VolumeRobustnessDegraded {
v.Status.LastDegradedAt = c.nowHandler()
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonDegraded, "volume %v became degraded", v.Name)
}

cliAPIVersion, err := c.ds.GetDataEngineImageCLIAPIVersion(e.Status.CurrentImage, e.Spec.DataEngine)
if err != nil {
return err
}

// Rebuild is not supported when:
// 1. the volume is being migrating to another node.
// 2. the volume is old restore/DR volumes.
// 3. the volume is expanding size.
isOldRestoreVolume := (v.Status.IsStandby || v.Status.RestoreRequired) &&
(types.IsDataEngineV1(e.Spec.DataEngine) && cliAPIVersion < engineapi.CLIVersionFour)
isInExpansion := v.Spec.Size != e.Status.CurrentSize
if isMigratingDone && !isOldRestoreVolume && !isInExpansion {
if err := c.replenishReplicas(v, e, rs, ""); err != nil {
return err
}
}
// replicas will be started by ReconcileVolumeState() later
}

for _, status := range e.Status.CloneStatus {
if status == nil {
continue
}

if status.State == engineapi.ProcessStateComplete && v.Status.CloneStatus.State != longhorn.VolumeCloneStateCompleted {
v.Status.CloneStatus.State = longhorn.VolumeCloneStateCompleted
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonVolumeCloneCompleted,
"finished cloning snapshot %v from source volume %v",
v.Status.CloneStatus.Snapshot, v.Status.CloneStatus.SourceVolume)
} else if status.State == engineapi.ProcessStateError && v.Status.CloneStatus.State != longhorn.VolumeCloneStateFailed {
v.Status.CloneStatus.State = longhorn.VolumeCloneStateFailed
c.eventRecorder.Eventf(v, corev1.EventTypeWarning, constant.EventReasonVolumeCloneFailed,
"failed to clone snapshot %v from source volume %v: %v",
v.Status.CloneStatus.Snapshot, v.Status.CloneStatus.SourceVolume, status.Error)
}
}

return nil
}

Check warning on line 856 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L616-L856

Very Complex Method
func isAutoSalvageNeeded(rs map[string]*longhorn.Replica) bool {
if isFirstAttachment(rs) {
return areAllReplicasFailed(rs)
Expand Down Expand Up @@ -1251,231 +1251,231 @@
}

// ReconcileVolumeState handles the attaching and detaching of volume
func (c *VolumeController) ReconcileVolumeState(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to reconcile volume state for %v", v.Name)
}()

log := getLoggerForVolume(c.logger, v)

e, err := c.ds.PickVolumeCurrentEngine(v, es)
if err != nil {
return err
}

if v.Status.CurrentImage == "" {
v.Status.CurrentImage = v.Spec.Image
}

if err := c.checkAndInitVolumeRestore(v); err != nil {
return err
}

if err := c.updateRequestedBackupForVolumeRestore(v, e); err != nil {
return err
}

if err := c.checkAndInitVolumeClone(v, e, log); err != nil {
return err
}

if err := c.updateRequestedDataSourceForVolumeCloning(v, e); err != nil {
return err
}

isNewVolume, e, err := c.reconcileVolumeCreation(v, e, es, rs)
if err != nil {
return err
}
if e == nil {
log.Warnf("Engine is nil while reconcile volume creation")
return nil
}

c.reconcileLogRequest(e, rs)

if err := c.reconcileVolumeCondition(v, e, rs, log); err != nil {
return err
}

if err := c.reconcileVolumeSize(v, e, rs); err != nil {
return err
}

v.Status.FrontendDisabled = v.Spec.DisableFrontend

// Clear SalvageRequested flag if SalvageExecuted flag has been set.
if e.Spec.SalvageRequested && e.Status.SalvageExecuted {
e.Spec.SalvageRequested = false
}

if isAutoSalvageNeeded(rs) {
v.Status.Robustness = longhorn.VolumeRobustnessFaulted
// If the volume is faulted, we don't need to have RWX fast failover.
// If shareManager is delinquent, clear both delinquent and stale state.
// If we don't do that volume will stuck in auto-savage loop.
// See https://github.com/longhorn/longhorn/issues/9089
if err := c.handleDelinquentAndStaleStateForFaultedRWXVolume(v); err != nil {
return err
}

autoSalvage, err := c.ds.GetSettingAsBool(types.SettingNameAutoSalvage)
if err != nil {
return err
}
// To make sure that we don't miss the `isAutoSalvageNeeded` event, This IF statement makes sure the `e.Spec.SalvageRequested=true`
// persist in ETCD before Longhorn salvages the failed replicas in the IF statement below it.
// More explanation: when all replicas fails, Longhorn tries to set `e.Spec.SalvageRequested=true`
// and try to detach the volume by setting `v.Status.CurrentNodeID = ""`.
// Because at the end of volume syncVolume(), Longhorn updates CRs in the order: replicas, engine, volume,
// when volume changes from v.Status.State == longhorn.VolumeStateAttached to v.Status.State == longhorn.VolumeStateDetached,
// we know that volume RS has been updated and therefore the engine RS also has been updated and persisted in ETCD.
// At this moment, Longhorn goes into the IF statement below this IF statement and salvage all replicas.
if autoSalvage && !v.Status.IsStandby && !v.Status.RestoreRequired {
// Since all replica failed and autoSalvage is enable, mark engine controller salvage requested
e.Spec.SalvageRequested = true
log.Infof("All replicas are failed, set engine salvageRequested to %v", e.Spec.SalvageRequested)
}
// make sure the volume is detached before automatically salvage replicas
if autoSalvage && v.Status.State == longhorn.VolumeStateDetached && !v.Status.IsStandby && !v.Status.RestoreRequired {
log.Info("All replicas are failed, auto-salvaging volume")

lastFailedAt := time.Time{}
failedUsableReplicas := map[string]*longhorn.Replica{}
dataExists := false

for _, r := range rs {
if r.Spec.HealthyAt == "" {
continue
}
dataExists = true
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
continue
}
if isDownOrDeleted, err := c.ds.IsNodeDownOrDeleted(r.Spec.NodeID); err != nil {
log.WithField("replica", r.Name).WithError(err).Warnf("Failed to check if node %v is still running for failed replica", r.Spec.NodeID)
continue
} else if isDownOrDeleted {
continue
}
node, err := c.ds.GetNodeRO(r.Spec.NodeID)
if err != nil {
log.WithField("replica", r.Name).WithError(err).Warnf("Failed to get node %v for failed replica", r.Spec.NodeID)
}
diskSchedulable := false
for _, diskStatus := range node.Status.DiskStatus {
if diskStatus.DiskUUID == r.Spec.DiskID {
if types.GetCondition(diskStatus.Conditions, longhorn.DiskConditionTypeSchedulable).Status == longhorn.ConditionStatusTrue {
diskSchedulable = true
break
}
}
}
if !diskSchedulable {
continue
}
failedAt, err := util.ParseTime(r.Spec.FailedAt)
if err != nil {
log.WithField("replica", r.Name).WithError(err).Warn("Failed to parse FailedAt timestamp for replica")
continue
}
if failedAt.After(lastFailedAt) {
lastFailedAt = failedAt
}
// all failedUsableReplica contains data
failedUsableReplicas[r.Name] = r
}
if !dataExists {
log.Warn("Failed to auto salvage volume: no data exists")
} else {
log.Infof("Bringing up %v replicas for auto-salvage", len(failedUsableReplicas))

// This salvage is for revision counter enabled case
salvaged := false
// Bring up the replicas for auto-salvage
for _, r := range failedUsableReplicas {
if util.TimestampWithinLimit(lastFailedAt, r.Spec.FailedAt, AutoSalvageTimeLimit) {
setReplicaFailedAt(r, "")
log.WithField("replica", r.Name).Warn("Automatically salvaging volume replica")
msg := fmt.Sprintf("Replica %v of volume %v will be automatically salvaged", r.Name, v.Name)
c.eventRecorder.Event(v, corev1.EventTypeWarning, constant.EventReasonAutoSalvaged, msg)
salvaged = true
}
}
if salvaged {
// remount the reattached volume later if possible
v.Status.RemountRequestedAt = c.nowHandler()
msg := fmt.Sprintf("Volume %v requested remount at %v after automatically salvaging replicas", v.Name, v.Status.RemountRequestedAt)
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonRemount, msg)
v.Status.Robustness = longhorn.VolumeRobustnessUnknown
return nil
}
}
}
} else { // !isAutoSalvageNeeded
if v.Status.Robustness == longhorn.VolumeRobustnessFaulted && v.Status.State == longhorn.VolumeStateDetached {
v.Status.Robustness = longhorn.VolumeRobustnessUnknown
// The volume was faulty and there are usable replicas.
// Therefore, we set RemountRequestedAt so that KubernetesPodController restarts the workload pod
v.Status.RemountRequestedAt = c.nowHandler()
msg := fmt.Sprintf("Volume %v requested remount at %v", v.Name, v.Status.RemountRequestedAt)
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonRemount, msg)
return nil
}

// Reattach volume if
// - volume is detached unexpectedly and there are still healthy replicas
// - engine dead unexpectedly and there are still healthy replicas when the volume is not attached
if e.Status.CurrentState == longhorn.InstanceStateError {
if v.Status.CurrentNodeID != "" || (v.Spec.NodeID != "" && v.Status.CurrentNodeID == "" && v.Status.State != longhorn.VolumeStateAttached) {
log.Warn("Reattaching the volume since engine of volume dead unexpectedly")
msg := fmt.Sprintf("Engine of volume %v dead unexpectedly, reattach the volume", v.Name)
c.eventRecorder.Event(v, corev1.EventTypeWarning, constant.EventReasonDetachedUnexpectedly, msg)
e.Spec.LogRequested = true
for _, r := range rs {
if r.Status.CurrentState == longhorn.InstanceStateRunning {
r.Spec.LogRequested = true
rs[r.Name] = r
}
}
v.Status.Robustness = longhorn.VolumeRobustnessFaulted
// If the volume is faulted, we don't need to have RWX fast failover.
// If shareManager is delinquent, clear both delinquent and stale state.
// If we don't do that volume will stuck in auto-savage loop.
// See https://github.com/longhorn/longhorn/issues/9089
if err := c.handleDelinquentAndStaleStateForFaultedRWXVolume(v); err != nil {
return err
}
}
}
}

// check volume mount status
c.requestRemountIfFileSystemReadOnly(v, e)

if err := c.reconcileAttachDetachStateMachine(v, e, rs, isNewVolume, log); err != nil {
return err
}

if v.Status.CurrentNodeID != "" &&
v.Status.State == longhorn.VolumeStateAttached &&
e.Status.CurrentState == longhorn.InstanceStateRunning {
if e.Spec.RequestedBackupRestore != "" {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeRestore, longhorn.ConditionStatusTrue, longhorn.VolumeConditionReasonRestoreInProgress, "")
}

// TODO: reconcileVolumeSize
// The engine expansion is complete
if v.Status.ExpansionRequired && v.Spec.Size == e.Status.CurrentSize {
v.Status.ExpansionRequired = false
v.Status.FrontendDisabled = false
}
}

return c.checkAndFinishVolumeRestore(v, e, rs)
}

Check notice on line 1478 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L1254-L1478

Complex Method
func (c *VolumeController) handleDelinquentAndStaleStateForFaultedRWXVolume(v *longhorn.Volume) error {
if !isRegularRWXVolume(v) {
return nil
Expand Down Expand Up @@ -1518,139 +1518,139 @@
}
}

func (c *VolumeController) reconcileAttachDetachStateMachine(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, isNewVolume bool, log *logrus.Entry) error {
// Here is the AD state machine graph
// https://github.com/longhorn/longhorn/blob/master/enhancements/assets/images/longhorn-volumeattachment/volume-controller-ad-logic.png

if isNewVolume || v.Status.State == "" {
v.Status.State = longhorn.VolumeStateCreating
return nil
}

if v.Spec.NodeID == "" {
if v.Status.CurrentNodeID == "" {
switch v.Status.State {
case longhorn.VolumeStateAttached, longhorn.VolumeStateAttaching, longhorn.VolumeStateCreating:
c.closeVolumeDependentResources(v, e, rs)
v.Status.State = longhorn.VolumeStateDetaching
case longhorn.VolumeStateDetaching:
c.closeVolumeDependentResources(v, e, rs)
if c.verifyVolumeDependentResourcesClosed(e, rs) {
v.Status.State = longhorn.VolumeStateDetached
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonDetached, "volume %v has been detached", v.Name)
}
case longhorn.VolumeStateDetached:
// This is a stable state.
// We attempt to close the resources anyway to make sure that they are closed
c.closeVolumeDependentResources(v, e, rs)
}
return nil
}
if v.Status.CurrentNodeID != "" {
switch v.Status.State {
case longhorn.VolumeStateAttaching, longhorn.VolumeStateAttached, longhorn.VolumeStateDetached:
c.closeVolumeDependentResources(v, e, rs)
v.Status.State = longhorn.VolumeStateDetaching
case longhorn.VolumeStateDetaching:
c.closeVolumeDependentResources(v, e, rs)
if c.verifyVolumeDependentResourcesClosed(e, rs) {
v.Status.CurrentNodeID = ""
v.Status.State = longhorn.VolumeStateDetached
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonDetached, "volume %v has been detached", v.Name)
}
}
return nil
}
}

if v.Spec.NodeID != "" {
if v.Status.CurrentNodeID == "" {
switch v.Status.State {
case longhorn.VolumeStateAttached:
c.closeVolumeDependentResources(v, e, rs)
v.Status.State = longhorn.VolumeStateDetaching
case longhorn.VolumeStateDetaching:
c.closeVolumeDependentResources(v, e, rs)
if c.verifyVolumeDependentResourcesClosed(e, rs) {
v.Status.State = longhorn.VolumeStateDetached
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonDetached, "volume %v has been detached", v.Name)
}
case longhorn.VolumeStateDetached:
if err := c.openVolumeDependentResources(v, e, rs, log); err != nil {
return err
}
v.Status.State = longhorn.VolumeStateAttaching
case longhorn.VolumeStateAttaching:
if err := c.openVolumeDependentResources(v, e, rs, log); err != nil {
return err
}
if c.areVolumeDependentResourcesOpened(e, rs) {
v.Status.CurrentNodeID = v.Spec.NodeID
v.Status.State = longhorn.VolumeStateAttached
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonAttached, "volume %v has been attached to %v", v.Name, v.Status.CurrentNodeID)
}
}
return nil
}

if v.Status.CurrentNodeID != "" {
if v.Spec.NodeID == v.Status.CurrentNodeID {
switch v.Status.State {
case longhorn.VolumeStateAttaching, longhorn.VolumeStateDetached:
c.closeVolumeDependentResources(v, e, rs)
v.Status.State = longhorn.VolumeStateDetaching
case longhorn.VolumeStateDetaching:
c.closeVolumeDependentResources(v, e, rs)
if c.verifyVolumeDependentResourcesClosed(e, rs) {
v.Status.CurrentNodeID = ""
v.Status.State = longhorn.VolumeStateDetached
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonDetached, "volume %v has been detached", v.Name)
}
case longhorn.VolumeStateAttached:
// This is a stable state
// Try to openVolumeDependentResources so that we start the newly added replicas if they exist
if err := c.openVolumeDependentResources(v, e, rs, log); err != nil {
return err
}
if !c.areVolumeDependentResourcesOpened(e, rs) {
log.Warnf("Volume is attached but dependent resources are not opened")
}
}
return nil
}
if v.Spec.NodeID != v.Status.CurrentNodeID {
switch v.Status.State {
case longhorn.VolumeStateDetached, longhorn.VolumeStateAttaching:
c.closeVolumeDependentResources(v, e, rs)
v.Status.State = longhorn.VolumeStateDetaching
case longhorn.VolumeStateDetaching:
c.closeVolumeDependentResources(v, e, rs)
if c.verifyVolumeDependentResourcesClosed(e, rs) {
v.Status.CurrentNodeID = ""
v.Status.State = longhorn.VolumeStateDetached
c.eventRecorder.Eventf(v, corev1.EventTypeNormal, constant.EventReasonDetached, "volume %v has been detached", v.Name)
}
case longhorn.VolumeStateAttached:
if v.Spec.Migratable && v.Spec.AccessMode == longhorn.AccessModeReadWriteMany && v.Status.CurrentMigrationNodeID != "" {
if err := c.openVolumeDependentResources(v, e, rs, log); err != nil {
return err
}
if !c.areVolumeDependentResourcesOpened(e, rs) {
log.Warnf("Volume is attached but dependent resources are not opened")
}
} else {
c.closeVolumeDependentResources(v, e, rs)
v.Status.State = longhorn.VolumeStateDetaching
}
}
return nil
}
}
}

return nil
}

Check notice on line 1653 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L1521-L1653

Complex Method
func (c *VolumeController) reconcileVolumeCreation(v *longhorn.Volume, e *longhorn.Engine, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (bool, *longhorn.Engine, error) {
// first time engine creation etc

Expand Down Expand Up @@ -1691,230 +1691,230 @@
}
}

func (c *VolumeController) reconcileVolumeCondition(v *longhorn.Volume, e *longhorn.Engine,
rs map[string]*longhorn.Replica, log *logrus.Entry) error {
numSnapshots := len(e.Status.Snapshots) - 1 // Counting volume-head here would be confusing.
if numSnapshots > VolumeSnapshotsWarningThreshold {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeTooManySnapshots, longhorn.ConditionStatusTrue,
longhorn.VolumeConditionReasonTooManySnapshots,
fmt.Sprintf("Snapshots count is %v over the warning threshold %v", numSnapshots,
VolumeSnapshotsWarningThreshold))
} else {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeTooManySnapshots, longhorn.ConditionStatusFalse,
"", "")
}

scheduled := true
aggregatedReplicaScheduledError := util.NewMultiError()
for _, r := range rs {
// check whether the replica need to be scheduled
if r.Spec.NodeID != "" {
continue
}
if v.Spec.DataLocality == longhorn.DataLocalityStrictLocal {
if v.Spec.NodeID == "" {
continue
}

r.Spec.HardNodeAffinity = v.Spec.NodeID
}
scheduledReplica, multiError, err := c.scheduler.ScheduleReplica(r, rs, v)
if err != nil {
return err
}
aggregatedReplicaScheduledError.Append(multiError)

if scheduledReplica == nil {
if r.Spec.HardNodeAffinity == "" {
log.WithField("replica", r.Name).Warn("Failed to schedule replica")
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeScheduled, longhorn.ConditionStatusFalse,
longhorn.VolumeConditionReasonReplicaSchedulingFailure, "")
} else {
log.WithField("replica", r.Name).Warnf("Failed to schedule replica of volume with HardNodeAffinity = %v", r.Spec.HardNodeAffinity)
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeScheduled, longhorn.ConditionStatusFalse,
longhorn.VolumeConditionReasonLocalReplicaSchedulingFailure, "")
}
scheduled = false
// requeue the volume to retry to schedule the replica after 30s
c.enqueueVolumeAfter(v, 30*time.Second)
} else {
rs[r.Name] = scheduledReplica
}
}

failureMessage := ""

if len(rs) != v.Spec.NumberOfReplicas {
scheduled = false
}

replenishCount, _ := c.getReplenishReplicasCount(v, rs, e)
if scheduled && replenishCount == 0 {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeScheduled, longhorn.ConditionStatusTrue, "", "")
} else if v.Status.CurrentNodeID == "" {
allowCreateDegraded, err := c.ds.GetSettingAsBool(types.SettingNameAllowVolumeCreationWithDegradedAvailability)
if err != nil {
return err
}
if allowCreateDegraded {
atLeastOneReplicaAvailable := false
for _, r := range rs {
if r.Spec.NodeID != "" && r.Spec.FailedAt == "" {
atLeastOneReplicaAvailable = true
break
}
}
if atLeastOneReplicaAvailable {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeScheduled, longhorn.ConditionStatusTrue, "",
"Reset schedulable due to allow volume creation with degraded availability")
scheduled = true
}
}
}
if !scheduled {
if len(aggregatedReplicaScheduledError) == 0 {
aggregatedReplicaScheduledError.Append(util.NewMultiError(longhorn.ErrorReplicaScheduleSchedulingFailed))
}
failureMessage = aggregatedReplicaScheduledError.Join()
scheduledCondition := types.GetCondition(v.Status.Conditions, longhorn.VolumeConditionTypeScheduled)
if scheduledCondition.Status == longhorn.ConditionStatusFalse {
if scheduledCondition.Reason == longhorn.VolumeConditionReasonReplicaSchedulingFailure &&
scheduledCondition.Message != "" {
failureMessage = scheduledCondition.Message
}
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeScheduled, longhorn.ConditionStatusFalse,
scheduledCondition.Reason, failureMessage)
}
}

if err := c.ds.UpdatePVAnnotation(v, types.PVAnnotationLonghornVolumeSchedulingError, failureMessage); err != nil {
log.Warnf("Failed to update PV annotation for volume %v", v.Name)
}

return nil
}

Check notice on line 1803 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L1694-L1803

Complex Method
func isVolumeOfflineUpgrade(v *longhorn.Volume) bool {
return v.Status.State == longhorn.VolumeStateDetached && v.Status.CurrentImage != v.Spec.Image
}

func (c *VolumeController) openVolumeDependentResources(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, log *logrus.Entry) error {
if isVolumeOfflineUpgrade(v) {
log.Info("Waiting for offline volume upgrade to finish")
return nil
}

for _, r := range rs {
// Don't attempt to start the replica or do anything else if it hasn't been scheduled.
if r.Spec.NodeID == "" {
continue
}
canIMLaunchReplica, err := c.canInstanceManagerLaunchReplica(r)
if err != nil {
return err
}
if canIMLaunchReplica {
if r.Spec.FailedAt == "" && r.Spec.Image == v.Status.CurrentImage {
if r.Status.CurrentState == longhorn.InstanceStateStopped {
r.Spec.DesireState = longhorn.InstanceStateRunning
}
}
} else {
// wait for IM is starting when volume is upgrading
if isVolumeUpgrading(v) {
continue
}

// If the engine isn't attached or the node goes down, the replica can be marked as failed.
// In the attached mode, we can determine whether the replica can fail by relying on the data plane's connectivity status.
nodeDeleted, err := c.ds.IsNodeDeleted(r.Spec.NodeID)
if err != nil {
return err
}

if v.Status.State != longhorn.VolumeStateAttached || nodeDeleted {
msg := fmt.Sprintf("Replica %v is marked as failed because the volume %v is not attached and the instance manager is unable to launch the replica", r.Name, v.Name)
if nodeDeleted {
msg = fmt.Sprintf("Replica %v is marked as failed since the node %v is deleted.", r.Name, r.Spec.NodeID)
}
log.WithField("replica", r.Name).Warn(msg)
if r.Spec.FailedAt == "" {
setReplicaFailedAt(r, c.nowHandler())
}
r.Spec.DesireState = longhorn.InstanceStateStopped
}
}
rs[r.Name] = r
}

replicaAddressMap := map[string]string{}
for _, r := range rs {
// Ignore unscheduled replicas
if r.Spec.NodeID == "" {
continue
}
if r.Spec.Image != v.Status.CurrentImage {
continue
}
if r.Spec.EngineName != e.Name {
continue
}
if r.Spec.FailedAt != "" {
continue
}
if r.Status.CurrentState == longhorn.InstanceStateError {
continue
}
// wait for all potentially healthy replicas become running
if r.Status.CurrentState != longhorn.InstanceStateRunning {
return nil
}
if r.Status.IP == "" {
log.WithField("replica", r.Name).Warn("Replica is running but IP is empty")
continue
}
if r.Status.StorageIP == "" {
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
continue
}
if r.Status.Port == 0 {
log.WithField("replica", r.Name).Warn("Replica is running but Port is empty")
continue
}
if _, ok := e.Spec.ReplicaAddressMap[r.Name]; !ok && util.IsVolumeMigrating(v) &&
e.Spec.NodeID == v.Spec.NodeID {
// The volume is migrating from this engine. Don't allow new replicas to be added until migration is
// complete per https://github.com/longhorn/longhorn/issues/6961.
log.WithField("replica", r.Name).Warn("Replica is running, but can't be added while migration is ongoing")
continue
}
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
}
if len(replicaAddressMap) == 0 {
return fmt.Errorf("no healthy or scheduled replica for starting")
}

if e.Spec.NodeID != "" && e.Spec.NodeID != v.Spec.NodeID {
return fmt.Errorf("engine is on node %v vs volume on %v, must detach first",
e.Spec.NodeID, v.Status.CurrentNodeID)
}
e.Spec.NodeID = v.Spec.NodeID
e.Spec.ReplicaAddressMap = replicaAddressMap
e.Spec.DesireState = longhorn.InstanceStateRunning
// The volume may be activated
e.Spec.DisableFrontend = v.Status.FrontendDisabled
e.Spec.Frontend = v.Spec.Frontend

return nil
}

Check notice on line 1917 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L1808-L1917

Complex Method
func (c *VolumeController) areVolumeDependentResourcesOpened(e *longhorn.Engine, rs map[string]*longhorn.Replica) bool {
// At least 1 replica should be running
hasRunningReplica := false
Expand All @@ -1927,65 +1927,65 @@
return hasRunningReplica && e.Status.CurrentState == longhorn.InstanceStateRunning
}

func (c *VolumeController) closeVolumeDependentResources(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeRestore, longhorn.ConditionStatusFalse, "", "")

if v.Status.Robustness != longhorn.VolumeRobustnessFaulted {
v.Status.Robustness = longhorn.VolumeRobustnessUnknown
} else {
if v.Status.RestoreRequired || v.Status.IsStandby {
v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeRestore, longhorn.ConditionStatusFalse, longhorn.VolumeConditionReasonRestoreFailure, "All replica restore failed and the volume became Faulted")
}
}

if e.Spec.DesireState != longhorn.InstanceStateStopped || e.Spec.NodeID != "" {
if v.Status.Robustness == longhorn.VolumeRobustnessFaulted {
e.Spec.LogRequested = true
}
// Prevent this field from being unset when restore/DR volumes crash unexpectedly.
if !v.Status.RestoreRequired && !v.Status.IsStandby {
e.Spec.BackupVolume = ""
}
e.Spec.RequestedBackupRestore = ""
e.Spec.NodeID = ""
e.Spec.DesireState = longhorn.InstanceStateStopped
}
// must make sure engine stopped first before stopping replicas
// otherwise we may corrupt the data
if e.Status.CurrentState != longhorn.InstanceStateStopped {
return
}

// check if any replica has been RW yet
dataExists := false
for _, r := range rs {
if r.Spec.HealthyAt != "" {
dataExists = true
break
}
}
for _, r := range rs {
if r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && dataExists {
// This replica must have been rebuilding. Mark it as failed.
setReplicaFailedAt(r, c.nowHandler())
// Unscheduled replicas are marked failed here when volume is detached.
// Check if NodeId or DiskID is empty to avoid deleting reusableFailedReplica when replenished.
if r.Spec.NodeID == "" || r.Spec.DiskID == "" {
r.Spec.RebuildRetryCount = scheduler.FailedReplicaMaxRetryCount
}
}
if r.Spec.DesireState != longhorn.InstanceStateStopped {
if v.Status.Robustness == longhorn.VolumeRobustnessFaulted {
r.Spec.LogRequested = true
}
r.Spec.DesireState = longhorn.InstanceStateStopped
rs[r.Name] = r
}
}
}

Check notice on line 1988 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L1930-L1988

Complex Method
func (c *VolumeController) verifyVolumeDependentResourcesClosed(e *longhorn.Engine, rs map[string]*longhorn.Replica) bool {
allReplicasStopped := func() bool {
for _, r := range rs {
Expand Down Expand Up @@ -2210,114 +2210,114 @@
// replenishReplicas will keep replicas count to v.Spec.NumberOfReplicas
// It will count all the potentially usable replicas, since some replicas maybe
// blank or in rebuilding state
func (c *VolumeController) replenishReplicas(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, hardNodeAffinity string) error {
concurrentRebuildingLimit, err := c.ds.GetSettingAsInt(types.SettingNameConcurrentReplicaRebuildPerNodeLimit)
if err != nil {
return err
}
disableReplicaRebuild := concurrentRebuildingLimit == 0

// If disabled replica rebuild, skip all the rebuild except first time creation.
if (len(rs) != 0) && disableReplicaRebuild {
return nil
}

if util.IsVolumeMigrating(v) {
return nil
}

if (len(rs) != 0) && v.Status.State != longhorn.VolumeStateAttached {
return nil
}

if e == nil {
return fmt.Errorf("replenishReplica needs a valid engine")
}

// To prevent duplicate IP for different replicas cause problem
// Wait for engine to:
// 1. make sure the existing healthy replicas have shown up in engine.spec.ReplicaAddressMap
// 2. has recognized all the replicas from the spec.ReplicaAddressMap in the status.ReplicaModeMap
// 3. has cleaned up the extra entries in the status.ReplicaModeMap
// https://github.com/longhorn/longhorn/issues/687
if !c.hasEngineStatusSynced(e, rs) {
return nil
}

if currentRebuilding := getRebuildingReplicaCount(e); currentRebuilding != 0 {
return nil
}

log := getLoggerForVolume(c.logger, v)

replenishCount, updateNodeAffinity := c.getReplenishReplicasCount(v, rs, e)
if hardNodeAffinity == "" && updateNodeAffinity != "" {
hardNodeAffinity = updateNodeAffinity
}

newVolume := len(rs) == 0

// For regular rebuild case or data locality case, rebuild one replica at a time
if (!newVolume && replenishCount > 0) || hardNodeAffinity != "" {
replenishCount = 1
}
for i := 0; i < replenishCount; i++ {
reusableFailedReplica, err := c.scheduler.CheckAndReuseFailedReplica(rs, v, hardNodeAffinity)
if err != nil {
return errors.Wrapf(err, "failed to reuse a failed replica during replica replenishment")
}

if reusableFailedReplica != nil {
if !c.backoff.IsInBackOffSinceUpdate(reusableFailedReplica.Name, time.Now()) {
log.Infof("Failed replica %v will be reused during rebuilding", reusableFailedReplica.Name)
setReplicaFailedAt(reusableFailedReplica, "")
reusableFailedReplica.Spec.HealthyAt = ""

if datastore.IsReplicaRebuildingFailed(reusableFailedReplica) {
reusableFailedReplica.Spec.RebuildRetryCount++
}
c.backoff.Next(reusableFailedReplica.Name, time.Now())

rs[reusableFailedReplica.Name] = reusableFailedReplica
continue
}
log.Warnf("Failed to reuse failed replica %v immediately, backoff period is %v now",
reusableFailedReplica.Name, c.backoff.Get(reusableFailedReplica.Name).Seconds())
// Couldn't reuse the replica. Add the volume back to the workqueue to check it later
c.enqueueVolumeAfter(v, c.backoff.Get(reusableFailedReplica.Name))
}
if checkBackDuration := c.scheduler.RequireNewReplica(rs, v, hardNodeAffinity); checkBackDuration == 0 {
newReplica := c.newReplica(v, e, hardNodeAffinity)

// Bypassing the precheck when hardNodeAffinity is provided, because
// we expect the new replica to be relocated to a specific node.
if hardNodeAffinity == "" {
if multiError, err := c.precheckCreateReplica(newReplica, rs, v); err != nil {
log.WithError(err).Warnf("Unable to create new replica %v", newReplica.Name)

aggregatedReplicaScheduledError := util.NewMultiError(longhorn.ErrorReplicaSchedulePrecheckNewReplicaFailed)
if multiError != nil {
aggregatedReplicaScheduledError.Append(multiError)
}

v.Status.Conditions = types.SetCondition(v.Status.Conditions,
longhorn.VolumeConditionTypeScheduled, longhorn.ConditionStatusFalse,
longhorn.VolumeConditionReasonReplicaSchedulingFailure, aggregatedReplicaScheduledError.Join())
continue
}
}

if err := c.createReplica(newReplica, v, rs, !newVolume); err != nil {
return err
}
} else {
// Couldn't create new replica. Add the volume back to the workqueue to check it later
c.enqueueVolumeAfter(v, checkBackDuration)
}
}
return nil
}

Check notice on line 2320 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L2213-L2320

Complex Method
func getRebuildingReplicaCount(e *longhorn.Engine) int {
rebuilding := 0
replicaExists := make(map[string]bool)
Expand Down Expand Up @@ -2376,97 +2376,97 @@
return adjustCount
}

func (c *VolumeController) getReplicaCountForAutoBalanceBestEffort(v *longhorn.Volume, e *longhorn.Engine,
rs map[string]*longhorn.Replica,
fnCount replicaAutoBalanceCount) (int, []string, []string) {

log := getLoggerForVolume(c.logger, v).WithField("replicaAutoBalanceOption", longhorn.ReplicaAutoBalanceBestEffort)

var err error
defer func() {
if err != nil {
log.WithError(err).Warn("Skip replica auto-balance")
}
}()

setting := c.ds.GetAutoBalancedReplicasSetting(v, log)
if setting != longhorn.ReplicaAutoBalanceBestEffort {
return 0, nil, []string{}
}

if v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
log.Warnf("Cannot auto-balance volume in %s state", v.Status.Robustness)
return 0, nil, []string{}
}

adjustCount := 0

var unusedCount int
unusedCount, extraRNames, err := fnCount(v, e, rs)
if unusedCount == 0 && len(extraRNames) <= 1 {
extraRNames = nil
}

var mostExtraRList []string
var mostExtraRCount int
var leastExtraROwners []string
var leastExtraRCount int
for owner, rNames := range extraRNames {
rNameCount := len(rNames)
if leastExtraRCount == 0 || rNameCount < leastExtraRCount {
leastExtraRCount = rNameCount
leastExtraROwners = []string{}
leastExtraROwners = append(leastExtraROwners, owner)
} else if rNameCount == leastExtraRCount {
leastExtraROwners = append(leastExtraROwners, owner)
}
if rNameCount > mostExtraRCount {
mostExtraRList = rNames
mostExtraRCount = rNameCount
}
}

if mostExtraRCount == 0 || mostExtraRCount == leastExtraRCount {
mostExtraRList = nil
leastExtraROwners = []string{}
} else {
adjustCount = mostExtraRCount - leastExtraRCount - 1
log.Infof("Found %v replicas from %v to balance to one of node in %v", adjustCount, mostExtraRList, leastExtraROwners)
}

if adjustCount == 0 {
replicasUnderDiskPressure, err := c.getReplicasUnderDiskPressure()
if err != nil {
log.WithError(err).Warn("Failed to get replicas in disk pressure")
return 0, nil, []string{}
}

for replicaName, replica := range rs {
if !replicasUnderDiskPressure[replicaName] {
continue
}

err := c.checkReplicaDiskPressuredSchedulableCandidates(v, replica)
if err != nil {
log.WithError(err).Tracef("Cannot find replica %v disk pressure candidates", replicaName)
continue
}

adjustCount++
mostExtraRList = append(mostExtraRList, replicaName)
leastExtraROwners = append(leastExtraROwners, replica.Spec.NodeID)
}

if adjustCount == 0 {
log.Trace("No replicas in disk pressure")
} else {
log.Infof("Found %v replicas in disk pressure with schedulable candidates", adjustCount)
}
}

return adjustCount, mostExtraRList, leastExtraROwners
}

Check notice on line 2469 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L2379-L2469

Complex Method
func (c *VolumeController) getReplicasUnderDiskPressure() (map[string]bool, error) {
settingDiskPressurePercentage, err := c.ds.GetSettingAsInt(types.SettingNameReplicaAutoBalanceDiskPressurePercentage)
if err != nil {
Expand Down Expand Up @@ -2602,116 +2602,116 @@
return nil
}

func (c *VolumeController) getReplicaCountForAutoBalanceZone(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (int, map[string][]string, error) {
log := getLoggerForVolume(c.logger, v).WithField("replicaAutoBalanceType", "zone")

readyNodes, err := c.listReadySchedulableAndScheduledNodesRO(v, rs, log)
if err != nil {
return 0, nil, err
}

var usedZones []string
var usedNodes []string
zoneExtraRs := make(map[string][]string)
// Count the engine node replica first so it doesn't get included in the
// duplicates list.
for _, r := range rs {
node, exist := readyNodes[r.Spec.NodeID]
if !exist {
continue
}
if r.Spec.NodeID == e.Spec.NodeID {
nZone := node.Status.Zone
zoneExtraRs[nZone] = []string{}
usedZones = append(usedZones, nZone)
break
}
}
for _, r := range rs {
if r.Status.CurrentState != longhorn.InstanceStateRunning {
continue
}

node, exist := readyNodes[r.Spec.NodeID]
if !exist {
// replica on node not count for auto-balance, could get evicted
continue
}

if r.Spec.NodeID == e.Spec.NodeID {
// replica on engine node not count for auto-balance
continue
}

nZone := node.Status.Zone
_, exist = zoneExtraRs[nZone]
if exist {
zoneExtraRs[nZone] = append(zoneExtraRs[nZone], r.Name)
} else {
zoneExtraRs[nZone] = []string{}
usedZones = append(usedZones, nZone)
}
if !util.Contains(usedNodes, r.Spec.NodeID) {
usedNodes = append(usedNodes, r.Spec.NodeID)
}
}
log.Debugf("Found %v use zones %v", len(usedZones), usedZones)
log.Debugf("Found %v use nodes %v", len(usedNodes), usedNodes)
if v.Spec.NumberOfReplicas == len(zoneExtraRs) {
log.Debugf("Balanced, %v volume replicas are running on different zones", v.Spec.NumberOfReplicas)
return 0, zoneExtraRs, nil
}

ei := &longhorn.EngineImage{}
if types.IsDataEngineV1(v.Spec.DataEngine) {
ei, err = c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
return 0, nil, err
}
}

unusedZone := make(map[string][]string)
for nodeName, node := range readyNodes {
if util.Contains(usedZones, node.Status.Zone) {
// cannot use node in zone because have running replica
continue
}

if util.Contains(usedNodes, nodeName) {
// cannot use node because have running replica
continue
}

if !node.Spec.AllowScheduling {
log.Warnf("Failed to use node %v, does not allow scheduling", nodeName)
continue
}

if isReady, _ := c.ds.CheckDataEngineImageReadiness(ei.Spec.Image, v.Spec.DataEngine, nodeName); !isReady {
log.Warnf("Failed to use node %v, image %v is not ready", nodeName, ei.Spec.Image)
continue
}

unusedZone[node.Status.Zone] = append(unusedZone[node.Status.Zone], nodeName)
}
if len(unusedZone) == 0 {
log.Debugf("Balanced, all ready zones are used by this volume")
return 0, zoneExtraRs, err
}

unevenCount := v.Spec.NumberOfReplicas - len(zoneExtraRs)
unusedCount := len(unusedZone)
adjustCount := 0
if unusedCount < unevenCount {
adjustCount = unusedCount
} else {
adjustCount = unevenCount
}
log.Infof("Found %v zone available for auto-balance duplicates in %v", adjustCount, zoneExtraRs)

return adjustCount, zoneExtraRs, err
}

Check notice on line 2714 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L2605-L2714

Complex Method
func (c *VolumeController) listReadySchedulableAndScheduledNodesRO(volume *longhorn.Volume, rs map[string]*longhorn.Replica, log logrus.FieldLogger) (map[string]*longhorn.Node, error) {
readyNodes, err := c.ds.ListReadyAndSchedulableNodesRO()
if err != nil {
Expand Down Expand Up @@ -2773,220 +2773,220 @@
return filteredReadyNodes, nil
}

func (c *VolumeController) getReplicaCountForAutoBalanceNode(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (int, map[string][]string, error) {
log := getLoggerForVolume(c.logger, v).WithField("replicaAutoBalanceType", "node")

readyNodes, err := c.listReadySchedulableAndScheduledNodesRO(v, rs, log)
if err != nil {
return 0, nil, err
}

nodeExtraRs := make(map[string][]string)
for _, r := range rs {
if r.Status.CurrentState != longhorn.InstanceStateRunning {
continue
}

_, exist := readyNodes[r.Spec.NodeID]
if !exist {
log.Warnf("Node %v is not ready or schedulable, replica %v could get evicted", r.Spec.NodeID, r.Name)
continue
}

nodeID := r.Spec.NodeID
_, isDuplicate := nodeExtraRs[nodeID]
if isDuplicate {
nodeExtraRs[nodeID] = append(nodeExtraRs[nodeID], r.Name)
} else {
nodeExtraRs[nodeID] = []string{}
}

if len(nodeExtraRs[nodeID]) > v.Spec.NumberOfReplicas {
msg := fmt.Sprintf("Too many replicas running on node %v", nodeExtraRs[nodeID])
log.WithField("nodeID", nodeID).Warn(msg)
return 0, nil, nil
}
}

if v.Spec.NumberOfReplicas == len(nodeExtraRs) {
log.Debugf("Balanced, volume replicas are running on different nodes")
return 0, nodeExtraRs, nil
}

ei := &longhorn.EngineImage{}
if types.IsDataEngineV1(v.Spec.DataEngine) {
ei, err = c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
return 0, nodeExtraRs, err
}
}

for nodeName, node := range readyNodes {
_, exist := nodeExtraRs[nodeName]
if exist {
continue
}

if !node.Spec.AllowScheduling {
log.Warnf("Failed to use node %v, does not allow scheduling", nodeName)
delete(readyNodes, nodeName)
continue
}

if isReady, _ := c.ds.CheckDataEngineImageReadiness(ei.Spec.Image, v.Spec.DataEngine, node.Name); !isReady {
log.Warnf("Failed to use node %v, image %v is not ready", nodeName, ei.Spec.Image)
delete(readyNodes, nodeName)
continue
}
}

if len(nodeExtraRs) == len(readyNodes) {
log.Debugf("Balanced, all ready nodes are used by this volume")
return 0, nodeExtraRs, nil
}

unevenCount := v.Spec.NumberOfReplicas - len(nodeExtraRs)
unusedCount := len(readyNodes) - len(nodeExtraRs)
adjustCount := 0
if unusedCount < unevenCount {
adjustCount = unusedCount
} else {
adjustCount = unevenCount
}
if adjustCount < 0 {
adjustCount = 0
}
log.Infof("Found %v node available for auto-balance duplicates in %v", adjustCount, nodeExtraRs)

return adjustCount, nodeExtraRs, err
}

Check notice on line 2863 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L2776-L2863

Complex Method
func (c *VolumeController) getReplenishReplicasCount(v *longhorn.Volume, rs map[string]*longhorn.Replica, e *longhorn.Engine) (int, string) {
usableCount := 0
for _, r := range rs {
// The failed to schedule local replica shouldn't be counted
if isDataLocalityBestEffort(v) && r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == "" &&
v.Status.CurrentNodeID != "" && r.Spec.HardNodeAffinity == v.Status.CurrentNodeID {
continue
}
// Skip the replica has been requested eviction.
if r.Spec.FailedAt == "" && (!r.Spec.EvictionRequested) && r.Spec.Active {
usableCount++
}
}

// Only create 1 replica while volume is in cloning process
if isCloningRequiredAndNotCompleted(v) {
if usableCount == 0 {
return 1, ""
}
return 0, ""
}

switch {
case v.Spec.NumberOfReplicas < usableCount:
return 0, ""
case v.Spec.NumberOfReplicas > usableCount:
return v.Spec.NumberOfReplicas - usableCount, ""
case v.Spec.NumberOfReplicas == usableCount:
if adjustCount := c.getReplicaCountForAutoBalanceLeastEffort(v, e, rs, c.getReplicaCountForAutoBalanceZone); adjustCount != 0 {
return adjustCount, ""
}
if adjustCount := c.getReplicaCountForAutoBalanceLeastEffort(v, e, rs, c.getReplicaCountForAutoBalanceNode); adjustCount != 0 {
return adjustCount, ""
}

var nCandidates []string
adjustCount, _, nCandidates := c.getReplicaCountForAutoBalanceBestEffort(v, e, rs, c.getReplicaCountForAutoBalanceNode)
if adjustCount == 0 {
adjustCount, _, zCandidates := c.getReplicaCountForAutoBalanceBestEffort(v, e, rs, c.getReplicaCountForAutoBalanceZone)
if adjustCount != 0 {
nCandidates = c.getNodeCandidatesForAutoBalanceZone(v, e, rs, zCandidates)
}
}
if adjustCount != 0 && len(nCandidates) != 0 {
// TODO: https://github.com/longhorn/longhorn/issues/2667
return adjustCount, nCandidates[0]
}

return adjustCount, ""
}
return 0, ""
}

Check notice on line 2916 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L2864-L2916

Complex Method
func (c *VolumeController) getNodeCandidatesForAutoBalanceZone(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica, zones []string) (candidateNames []string) {
log := getLoggerForVolume(c.logger, v).WithFields(
logrus.Fields{
"replicaAutoBalanceOption": longhorn.ReplicaAutoBalanceBestEffort,
"replicaAutoBalanceType": "zone",
},
)

var err error
defer func() {
if err != nil {
log.WithError(err).Warn("Skip replica zone auto-balance")
}
}()

if len(zones) == 0 {
return candidateNames
}

if v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
log.Warnf("Failed to auto-balance volume in %s state", v.Status.Robustness)
return candidateNames
}

readyNodes, err := c.ds.ListReadyAndSchedulableNodesRO()
if err != nil {
return candidateNames
}

ei := &longhorn.EngineImage{}
if types.IsDataEngineV1(v.Spec.DataEngine) {
ei, err = c.getEngineImageRO(v.Status.CurrentImage)
if err != nil {
return candidateNames
}
}
for nName, n := range readyNodes {
for _, zone := range zones {
if n.Status.Zone != zone {
delete(readyNodes, nName)
continue
}
}

if !n.Spec.AllowScheduling {
// cannot use node, does not allow scheduling.
delete(readyNodes, nName)
continue
}

if isReady, _ := c.ds.CheckDataEngineImageReadiness(ei.Spec.Image, v.Spec.DataEngine, nName); !isReady {
// cannot use node, engine image is not ready
delete(readyNodes, nName)
continue
}

for _, r := range rs {
if r.Spec.NodeID == nName {
delete(readyNodes, nName)
break
}
}
}

for nName := range readyNodes {
candidateNames = append(candidateNames, nName)
}
if len(candidateNames) != 0 {
log.Infof("Found node candidates: %v ", candidateNames)
}
return candidateNames
}

Check notice on line 2989 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L2917-L2989

Complex Method
func (c *VolumeController) hasEngineStatusSynced(e *longhorn.Engine, rs map[string]*longhorn.Replica) bool {
connectedReplicaCount := 0
for _, r := range rs {
Expand All @@ -3013,160 +3013,160 @@
return true
}

func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
if len(es) > 1 {
return nil
}

e, err := c.ds.PickVolumeCurrentEngine(v, es)
if err != nil {
return err
}
if e == nil {
return nil
}

log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
"engine": e.Name,
"volumeDesiredEngineImage": v.Spec.Image,
})

if !isVolumeUpgrading(v) {
// it must be a rollback
if e.Spec.Image != v.Spec.Image {
e.Spec.Image = v.Spec.Image
e.Spec.UpgradedReplicaAddressMap = map[string]string{}
return nil
}
// If the engine already has new engine image or it has been stopped,
// live upgrade is not needed, there is no point to keep e.Spec.UpgradedReplicaAddressMap
if e.Spec.Image == e.Status.CurrentImage || e.Status.CurrentImage == "" {
e.Spec.UpgradedReplicaAddressMap = map[string]string{}
}
return nil
}

// If volume is detached accidentally during the live upgrade,
// the live upgrade info and the inactive replicas are meaningless.
if v.Status.State == longhorn.VolumeStateDetached {
if e.Spec.Image != v.Spec.Image {
e.Spec.Image = v.Spec.Image
e.Spec.UpgradedReplicaAddressMap = map[string]string{}
}
for _, r := range rs {
if r.Spec.Image != v.Spec.Image {
r.Spec.Image = v.Spec.Image
rs[r.Name] = r
}
if !r.Spec.Active {
log.Infof("Removing inactive replica %v when the volume is detached accidentally during the live upgrade", r.Name)
if err := c.deleteReplica(r, rs); err != nil {
return err
}
}
}
// TODO current replicas should be calculated by checking if there is
// any other image exists except for the v.Spec.Image
v.Status.CurrentImage = v.Spec.Image
return nil
}

// Only start live upgrade if volume is healthy
if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
if v.Status.State != longhorn.VolumeStateAttached || v.Status.Robustness != longhorn.VolumeRobustnessDegraded {
return nil
}
// Clean up inactive replica which doesn't have any matching replica.
// This will allow volume controller to schedule and rebuild a new active replica
// and make the volume healthy again for the live upgrade to be able to continue.
// https://github.com/longhorn/longhorn/issues/7012

// sort the replica by name so that we don't select multiple replicas when running this logic repeatedly quickly
var sortedReplicaNames []string
for rName := range rs {
sortedReplicaNames = append(sortedReplicaNames, rName)
}
sort.Strings(sortedReplicaNames)

var inactiveReplicaToCleanup *longhorn.Replica
for _, rName := range sortedReplicaNames {
r := rs[rName]
if !r.Spec.Active && !hasMatchingReplica(r, rs) {
inactiveReplicaToCleanup = r
break
}
}
if inactiveReplicaToCleanup != nil {
log.Infof("Cleaning up inactive replica %v which doesn't have any matching replica", inactiveReplicaToCleanup.Name)
if err := c.deleteReplica(inactiveReplicaToCleanup, rs); err != nil {
return errors.Wrapf(err, "failed to cleanup inactive replica %v", inactiveReplicaToCleanup.Name)
}
return nil
}

// When the volume is degraded, even though we don't start the live engine upgrade, try to mark it as finished if it already started
c.finishLiveEngineUpgrade(v, e, rs, log)

return nil
}

volumeAndReplicaNodes := []string{v.Status.CurrentNodeID}
for _, r := range rs {
if r.Spec.NodeID == "" {
continue
}
volumeAndReplicaNodes = append(volumeAndReplicaNodes, r.Spec.NodeID)
}

if types.IsDataEngineV1(v.Spec.DataEngine) {
if err := c.checkOldAndNewEngineImagesForLiveUpgrade(v, volumeAndReplicaNodes...); err != nil {
log.Warnf("%v", err)
return nil
}

_, dataPathToOldRunningReplica, dataPathToNewReplica := c.groupReplicasByImageAndState(v, e, rs)

// Skip checking and creating new replicas for the 2 cases:
// 1. Volume is degraded.
// 2. The new replicas is activated and all old replicas are already purged.
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica,
func(r *longhorn.Replica, engineImage string) {
r.Spec.Image = engineImage
},
v.Spec.Image); err != nil {
return err
}
}

if e.Spec.Image != v.Spec.Image {
replicaAddressMap, err := c.constructReplicaAddressMap(v, e, dataPathToNewReplica)
if err != nil {
return nil
}

// Only upgrade e.Spec.Image if there are enough new upgraded replica.
// This prevent the deadlock in the case that an upgrade from engine image
// is followed immediately by an other upgrade.
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
// On the other hand, the engine controller blocks the engine's status from being refreshed
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
e.Spec.Image = v.Spec.Image
}
}
c.finishLiveEngineUpgrade(v, e, rs, log)
} else {
// TODO: Implement the logic for data engine v2
return nil
}
return nil
}

Check notice on line 3169 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L3016-L3169

Complex Method
func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) {
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
"engine": e.Name,
Expand Down Expand Up @@ -3357,85 +3357,85 @@
return nil
}

func (c *VolumeController) checkAndFinishVolumeRestore(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) error {
log := getLoggerForVolume(c.logger, v)

if e == nil {
return nil
}
// a restore/DR volume is considered as finished if the following conditions are satisfied:
// 1) The restored backup is up-to-date;
// 2) The volume is no longer a DR volume;
// 3) The restore/DR volume is
// 3.1) it's state `Healthy`;
// 3.2) or it's state `Degraded` with all the scheduled replica included in the engine
if v.Spec.FromBackup == "" {
return nil
}
isPurging := false
for _, status := range e.Status.PurgeStatus {
if status.IsPurging {
isPurging = true
break
}
}
if !(e.Spec.RequestedBackupRestore != "" && e.Spec.RequestedBackupRestore == e.Status.LastRestoredBackup &&
!v.Spec.Standby) {
return nil
}

if v.Status.IsStandby {
// For DR volume, make sure the backup volume is up-to-date and the latest backup is restored
_, bvName, _, err := backupstore.DecodeBackupURL(v.Spec.FromBackup)
if err != nil {
return errors.Wrapf(err, "failed to get backup name from volume %s backup URL %v", v.Name, v.Spec.FromBackup)
}
bv, err := c.ds.GetBackupVolumeRO(bvName)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if bv != nil {
if !bv.Status.LastSyncedAt.IsZero() &&
bv.Spec.SyncRequestedAt.After(bv.Status.LastSyncedAt.Time) {
log.Infof("Restore/DR volume needs to wait for backup volume %s update", bvName)
return nil
}
if bv.Status.LastBackupName != "" {
// If the backup CR does not exist, the Longhorn system may be still syncing up the info with the remote backup target.
// If the backup is removed already, the backup volume should receive the notification and update bv.Status.LastBackupName.
// Hence we cannot continue the activation when the backup get call returns error IsNotFound.
b, err := c.ds.GetBackup(bv.Status.LastBackupName)
if err != nil {
return err
}
if b.Name != e.Status.LastRestoredBackup {
log.Infof("Restore/DR volume needs to restore the latest backup %s, and the current restored backup is %s", b.Name, e.Status.LastRestoredBackup)
c.enqueueVolume(v)
return nil
}
}
}
}

allScheduledReplicasIncluded, err := c.checkAllScheduledReplicasIncluded(v, e, rs)
if err != nil {
return err
}

degradedVolumeSupported, err := c.ds.GetSettingAsBool(types.SettingNameAllowVolumeCreationWithDegradedAvailability)
if err != nil {
return err
}

if !isPurging && ((v.Status.Robustness == longhorn.VolumeRobustnessHealthy && allScheduledReplicasIncluded) || (v.Status.Robustness == longhorn.VolumeRobustnessDegraded && degradedVolumeSupported)) {
log.Infof("Restore/DR volume finished with the last restored backup %s", e.Status.LastRestoredBackup)
v.Status.IsStandby = false
v.Status.RestoreRequired = false
}

return nil
}

Check notice on line 3438 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L3360-L3438

Complex Method
func (c *VolumeController) checkAllScheduledReplicasIncluded(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (bool, error) {
healthReplicaCount := 0
hasReplicaNotIncluded := false
Expand Down Expand Up @@ -4022,6 +4022,18 @@
rs, pathToOldRs, pathToNewRs map[string]*longhorn.Replica,
fixupFunc func(r *longhorn.Replica, obj string), obj string) error {
log := getLoggerForVolume(c.logger, v)
if types.IsDataEngineV2(v.Spec.DataEngine) {
for path, r := range pathToOldRs {
if pathToNewRs[path] != nil {
continue
}
fixupFunc(r, obj)
rs[r.Name] = r
pathToNewRs[path] = r
}
return nil
}

for path, r := range pathToOldRs {
if pathToNewRs[path] != nil {
continue
Expand All @@ -4041,7 +4053,7 @@
return nil
}

func (c *VolumeController) deleteInvalidMigrationReplicas(rs, pathToOldRs, pathToNewRs map[string]*longhorn.Replica) error {
func (c *VolumeController) deleteInvalidMigrationReplicas(rs, pathToOldRs, pathToNewRs map[string]*longhorn.Replica, v *longhorn.Volume) error {
for path, r := range pathToNewRs {
matchTheOldReplica := pathToOldRs[path] != nil && r.Spec.DesireState == pathToOldRs[path].Spec.DesireState
newReplicaIsAvailable := r.DeletionTimestamp == nil && r.Spec.DesireState == longhorn.InstanceStateRunning &&
Expand All @@ -4050,8 +4062,12 @@
continue
}
delete(pathToNewRs, path)
if err := c.deleteReplica(r, rs); err != nil {
return errors.Wrapf(err, "failed to delete the new replica %v when there is no matching old replica in path %v", r.Name, path)
if types.IsDataEngineV1(v.Spec.DataEngine) {
if err := c.deleteReplica(r, rs); err != nil {
return errors.Wrapf(err, "failed to delete the new replica %v when there is no matching old replica in path %v", r.Name, path)
}
} else {
r.Spec.MigrationEngineName = ""
}
}
return nil
Expand All @@ -4071,308 +4087,327 @@
}
}

func (c *VolumeController) processMigration(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to process migration for %v", v.Name)
}()

if !util.IsMigratableVolume(v) {
return nil
}

log := getLoggerForVolume(c.logger, v).WithField("migrationNodeID", v.Spec.MigrationNodeID)

// cannot process migrate when upgrading
if isVolumeUpgrading(v) {
log.Warn("Skip the migration processing since the volume is being upgraded")
return nil
}

if v.Spec.MigrationNodeID == "" {
// The volume attachment controller has stopped the migration (if one was ever started). We must clean up any
// extra engines/replicas and leave the volume in a "good" state.

// The only time there should be more then one engines is when we are migrating. If there are more then one and
// we no longer have a MigrationNodeID set we can cleanup the extra engine.
if len(es) < 2 && v.Status.CurrentMigrationNodeID == "" {
return nil // There is nothing to do.
}

// The volume is no longer attached or should no longer be attached. We will clean up the migration below by
// removing the extra engine and replicas. Warn the user.
if v.Spec.NodeID == "" || v.Status.CurrentNodeID == "" {
msg := ("Volume migration failed unexpectedly; detach volume from extra node to resume")
c.eventRecorder.Event(v, corev1.EventTypeWarning, constant.EventReasonMigrationFailed, msg)
}

// This is a migration confirmation. We need to switch the CurrentNodeID to NodeID so that currentEngine becomes
// the migration engine.
if v.Spec.NodeID != "" && v.Status.CurrentNodeID != v.Spec.NodeID {
log.Infof("Volume migration complete switching current node id from %v to %v", v.Status.CurrentNodeID, v.Spec.NodeID)
v.Status.CurrentNodeID = v.Spec.NodeID
}

// The latest current engine is based on the multiple node related fields of the volume and the NodeID of all
// engines. If the new engine matches, it means a migration confirmation then it's time to remove the old
// engine. If the old engine matches, it means a migration rollback hence cleaning up the migration engine is
// required.
currentEngine, extras, err := datastore.GetNewCurrentEngineAndExtras(v, es)
if err != nil {
log.WithError(err).Warn("Failed to finalize the migration")
return nil
}
// The cleanup can be done only after the new current engine is found.
for i := range extras {
e := extras[i]
if e.DeletionTimestamp == nil {
if err := c.deleteEngine(e, es); err != nil {
return err
}
log.Infof("Removing extra engine %v after switching the current engine to %v", e.Name, currentEngine.Name)
}
}

currentEngine.Spec.Active = true

// cleanupCorruptedOrStaleReplicas() will take care of old replicas
c.switchActiveReplicas(rs, func(r *longhorn.Replica, engineName string) bool {
return r.Spec.EngineName == engineName && r.Spec.HealthyAt != ""
}, currentEngine.Name)
if types.IsDataEngineV1(v.Spec.DataEngine) {
c.switchActiveReplicas(rs, func(r *longhorn.Replica, engineName string) bool {
return r.Spec.EngineName == engineName && r.Spec.HealthyAt != ""
}, currentEngine.Name)
} else {
for _, r := range rs {
r.Spec.MigrationEngineName = ""
r.Spec.EngineName = currentEngine.Name
}
}

// migration rollback or confirmation finished
v.Status.CurrentMigrationNodeID = ""

return nil
}

// Only enter the migration flow when the volume is attached and running. Otherwise, the volume attachment
// controller will detach the volume soon and not reattach it until we resolve the migration above.
if v.Spec.NodeID == "" || v.Status.CurrentNodeID == "" || len(es) == 0 {
return nil
}
if v.Status.Robustness != longhorn.VolumeRobustnessDegraded && v.Status.Robustness != longhorn.VolumeRobustnessHealthy {
log.Warnf("Skip the migration processing since the volume current robustness is %v", v.Status.Robustness)
return nil
}

// init the volume migration
if v.Status.CurrentMigrationNodeID == "" {
v.Status.CurrentMigrationNodeID = v.Spec.MigrationNodeID
}

revertRequired := false
defer func() {
if !revertRequired {
return
}

log.Warnf("Cleaning up the migration engine and all migration replicas to revert migration")

currentEngine, err2 := c.getCurrentEngineAndCleanupOthers(v, es)
if err2 != nil {
err = errors.Wrapf(err, "failed to get the current engine and clean up others during the migration revert: %v", err2)
return
}

for _, r := range rs {
if r.Spec.EngineName == currentEngine.Name {
continue
if types.IsDataEngineV1(v.Spec.DataEngine) {
for _, r := range rs {
if r.Spec.EngineName == currentEngine.Name {
continue
}
if err2 := c.deleteReplica(r, rs); err2 != nil {
err = errors.Wrapf(err, "failed to delete the migration replica %v during the migration revert: %v", r.Name, err2)
return
}
}
if err2 := c.deleteReplica(r, rs); err2 != nil {
err = errors.Wrapf(err, "failed to delete the migration replica %v during the migration revert: %v", r.Name, err2)
return
} else {
for _, r := range rs {
r.Spec.MigrationEngineName = ""
}
}
}()

currentEngine, extras, err := datastore.GetCurrentEngineAndExtras(v, es)
if err != nil {
return err
}

if currentEngine.Status.CurrentState != longhorn.InstanceStateRunning {
revertRequired = true
log.Warnf("Need to revert the migration since the current engine %v is state %v", currentEngine.Name, currentEngine.Status.CurrentState)
return nil
}

var availableEngines int
var migrationEngine *longhorn.Engine
for _, extra := range extras {
if extra.DeletionTimestamp != nil {
continue
}

// a valid migration engine is either a newly created engine that hasn't been assigned a node yet
// or an existing engine that is running on the migration node, any additional extra engines would be unexpected.
availableEngines++
isValidMigrationEngine := extra.Spec.NodeID == "" || extra.Spec.NodeID == v.Spec.MigrationNodeID
if isValidMigrationEngine {
migrationEngine = extra
}
}

// verify that we are in a valid state for migration
// we only consider engines without a deletion timestamp as candidates for the migration engine
unexpectedEngineCount := availableEngines > 1
invalidMigrationEngine := availableEngines > 0 && migrationEngine == nil
if unexpectedEngineCount || invalidMigrationEngine {
revertRequired = true
log.Warnf("Unexpected state for migration, current engine count %v has invalid migration engine %v",
len(es), invalidMigrationEngine)
return nil
}

if migrationEngine == nil {
migrationEngine, err = c.createEngine(v, currentEngine.Name)
if err != nil {
return err
}
es[migrationEngine.Name] = migrationEngine
}

log = log.WithField("migrationEngine", migrationEngine.Name)

ready := false
if ready, revertRequired, err = c.prepareReplicasAndEngineForMigration(v, currentEngine, migrationEngine, rs); err != nil {
return err
}
if !ready || revertRequired {
return nil
}

log.Info("Volume migration engine is ready")
return nil
}

Check notice on line 4276 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L4090-L4276

Complex Method
func (c *VolumeController) prepareReplicasAndEngineForMigration(v *longhorn.Volume, currentEngine, migrationEngine *longhorn.Engine, rs map[string]*longhorn.Replica) (ready, revertRequired bool, err error) {
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{"migrationNodeID": v.Spec.MigrationNodeID, "migrationEngine": migrationEngine.Name})

// Check the migration engine current status
if migrationEngine.Spec.NodeID != "" && migrationEngine.Spec.NodeID != v.Spec.MigrationNodeID {
log.Warnf("Migration engine is on node %v but volume is somehow required to be migrated to %v, will do revert then restart the migration",
migrationEngine.Spec.NodeID, v.Spec.MigrationNodeID)
return false, true, nil
}
if migrationEngine.DeletionTimestamp != nil {
log.Warn("Migration engine is in deletion, will start or continue migration revert")
return false, true, nil
}
if migrationEngine.Spec.DesireState != longhorn.InstanceStateStopped && migrationEngine.Spec.DesireState != longhorn.InstanceStateRunning {
log.Warnf("Need to do revert since the migration engine contains an invalid desire state %v", migrationEngine.Spec.DesireState)
return false, true, nil
}
if migrationEngine.Status.CurrentState == longhorn.InstanceStateError {
log.Errorf("The migration engine is state %v, need to do revert then retry the migration", migrationEngine.Status.CurrentState)
return false, true, nil
}

// Sync migration replicas with old replicas
currentAvailableReplicas := map[string]*longhorn.Replica{}
migrationReplicas := map[string]*longhorn.Replica{}
for _, r := range rs {
isUnavailable, err := c.IsReplicaUnavailable(r)
if err != nil {
return false, false, err
}
if isUnavailable {
continue
}
dataPath := types.GetReplicaDataPath(r.Spec.DiskPath, r.Spec.DataDirectoryName)
if r.Spec.EngineName == currentEngine.Name {
switch currentEngine.Status.ReplicaModeMap[r.Name] {
case longhorn.ReplicaModeWO:
log.Infof("Need to revert rather than starting migration since the current replica %v is mode WriteOnly, which means the rebuilding is in progress", r.Name)
return false, true, nil
case longhorn.ReplicaModeRW:
currentAvailableReplicas[dataPath] = r
case "":
if _, ok := currentEngine.Spec.ReplicaAddressMap[r.Name]; ok {
log.Infof("Need to revert rather than starting migration since the current replica %v is already in the engine spec, which means it may start rebuilding", r.Name)
return false, true, nil
}
log.Warnf("Running replica %v wasn't added to engine, will ignore it and continue migration", r.Name)
default:
log.Warnf("Unexpected mode %v for the current replica %v, will ignore it and continue migration", currentEngine.Status.ReplicaModeMap[r.Name], r.Name)
}
} else if r.Spec.EngineName == migrationEngine.Name {
migrationReplicas[dataPath] = r
} else if r.Spec.MigrationEngineName == migrationEngine.Name {
migrationReplicas[dataPath] = r
} else {
log.Warnf("During migration found unknown replica with engine %v, will directly remove it", r.Spec.EngineName)
if err := c.deleteReplica(r, rs); err != nil {
return false, false, err
}
}
}

if err := c.deleteInvalidMigrationReplicas(rs, currentAvailableReplicas, migrationReplicas); err != nil {
if err := c.deleteInvalidMigrationReplicas(rs, currentAvailableReplicas, migrationReplicas, v); err != nil {
return false, false, err
}

if err := c.createAndStartMatchingReplicas(v, rs, currentAvailableReplicas, migrationReplicas, func(r *longhorn.Replica, engineName string) {
r.Spec.EngineName = engineName
if types.IsDataEngineV1(v.Spec.DataEngine) {
r.Spec.EngineName = engineName
} else {
r.Spec.MigrationEngineName = engineName
}
}, migrationEngine.Name); err != nil {
return false, false, err
}

if len(migrationReplicas) == 0 {
log.Warnf("Volume %v: no valid migration replica during the migration, need to do revert first", v.Name)
return false, true, nil
}

// Sync the migration engine with migration replicas
replicaAddressMap := map[string]string{}
allMigrationReplicasReady := true
for _, r := range migrationReplicas {
if r.Status.CurrentState != longhorn.InstanceStateRunning {
allMigrationReplicasReady = false
continue
}
if r.Status.IP == "" {
log.Warnf("Replica %v is running but IP is empty", r.Name)
continue
}
if r.Status.StorageIP == "" {
log.Warnf("Replica %v is running but storage IP is empty", r.Name)
continue
}
if r.Status.Port == 0 {
log.Warnf("Replica %v is running but Port is empty", r.Name)
continue
}
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
}
if migrationEngine.Spec.DesireState != longhorn.InstanceStateStopped {
if len(replicaAddressMap) == 0 {
log.Warn("No available migration replica for the current running engine, will direct do revert")
return false, true, nil
}
// If there are some migration replicas not ready yet or not in the engine, need to restart the engine so that
// the missed replicas can be added to the engine without rebuilding.
if !allMigrationReplicasReady || !reflect.DeepEqual(migrationEngine.Spec.ReplicaAddressMap, replicaAddressMap) {
log.Warn("The current available migration replicas do not match the record in the migration engine status, will restart the migration engine then update the replica map")
migrationEngine.Spec.NodeID = ""
migrationEngine.Spec.ReplicaAddressMap = map[string]string{}
migrationEngine.Spec.DesireState = longhorn.InstanceStateStopped
return false, false, nil
}
} else { // migrationEngine.Spec.DesireState == longhorn.InstanceStateStopped
if migrationEngine.Status.CurrentState != longhorn.InstanceStateStopped || !allMigrationReplicasReady {
return false, false, nil
}
}

migrationEngine.Spec.NodeID = v.Spec.MigrationNodeID
migrationEngine.Spec.ReplicaAddressMap = replicaAddressMap
migrationEngine.Spec.DesireState = longhorn.InstanceStateRunning

if migrationEngine.Status.CurrentState != longhorn.InstanceStateRunning {
return false, false, nil
}

return true, false, nil
}

Check notice on line 4410 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L4277-L4410

Complex Method
func (c *VolumeController) IsReplicaUnavailable(r *longhorn.Replica) (bool, error) {
if r.Spec.NodeID == "" || r.Spec.DiskID == "" || r.Spec.DiskPath == "" || r.Spec.DataDirectoryName == "" {
return true, nil
Expand Down Expand Up @@ -4405,66 +4440,66 @@
// isResponsibleFor picks a running node that has the default engine image deployed.
// We need the default engine image deployed on the node to perform operation like backup operations.
// Prefer picking the node v.Spec.NodeID if it meet the above requirement.
func (c *VolumeController) isResponsibleFor(v *longhorn.Volume, defaultEngineImage string) (bool, error) {
var err error
defer func() {
err = errors.Wrap(err, "error while checking isResponsibleFor")
}()

// If a regular RWX is delinquent, try to switch ownership quickly to the owner node of the share manager CR
isOwnerNodeDelinquent, err := c.ds.IsNodeDelinquent(v.Status.OwnerID, v.Name)
if err != nil {
return false, err
}
isSpecNodeDelinquent, err := c.ds.IsNodeDelinquent(v.Spec.NodeID, v.Name)
if err != nil {
return false, err
}
preferredOwnerID := v.Spec.NodeID
if isOwnerNodeDelinquent || isSpecNodeDelinquent {
sm, err := c.ds.GetShareManager(v.Name)
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}
if sm != nil {
preferredOwnerID = sm.Status.OwnerID
}
}

isResponsible := isControllerResponsibleFor(c.controllerID, c.ds, v.Name, preferredOwnerID, v.Status.OwnerID)

if types.IsDataEngineV1(v.Spec.DataEngine) {
readyNodesWithDefaultEI, err := c.ds.ListReadyNodesContainingEngineImageRO(defaultEngineImage)
if err != nil {
return false, err
}
// No node in the system has the default engine image,
// Fall back to the default logic where we pick a running node to be the owner
if len(readyNodesWithDefaultEI) == 0 {
return isResponsible, nil
}
}

preferredOwnerDataEngineAvailable, err := c.ds.CheckDataEngineImageReadiness(defaultEngineImage, v.Spec.DataEngine, v.Spec.NodeID)
if err != nil {
return false, err
}
currentOwnerDataEngineAvailable, err := c.ds.CheckDataEngineImageReadiness(defaultEngineImage, v.Spec.DataEngine, v.Status.OwnerID)
if err != nil {
return false, err
}
currentNodeDataEngineAvailable, err := c.ds.CheckDataEngineImageReadiness(defaultEngineImage, v.Spec.DataEngine, c.controllerID)
if err != nil {
return false, err
}

isPreferredOwner := currentNodeDataEngineAvailable && isResponsible
continueToBeOwner := currentNodeDataEngineAvailable && !preferredOwnerDataEngineAvailable && c.controllerID == v.Status.OwnerID
requiresNewOwner := currentNodeDataEngineAvailable && !preferredOwnerDataEngineAvailable && !currentOwnerDataEngineAvailable

return isPreferredOwner || continueToBeOwner || requiresNewOwner, nil
}

Check notice on line 4502 in controller/volume_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/volume_controller.go#L4443-L4502

Complex Method
func (c *VolumeController) deleteReplica(r *longhorn.Replica, rs map[string]*longhorn.Replica) error {
// Must call Update before removal to keep the fields up to date
if _, err := c.ds.UpdateReplica(r); err != nil {
Expand Down
Loading
Loading