Skip to content

Commit

Permalink
add a limit before failing replication check and fail when mysql fails
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Dec 9, 2024
1 parent 63dfb9e commit ee310c8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
24 changes: 24 additions & 0 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const (
phaseNameTakeNewBackup = "TakeNewBackup"
phaseStatusCatchupReplicationStalled = "Stalled"
phaseStatusCatchupReplicationStopped = "Stopped"

// We will allow maximum 60 errors in a row when waiting for replication status before taking the new backup.
// As we try every second, this is equivalent to minimum 1 minute of continuously erroring before failing.
// It allows us to ignore transient errors while avoiding repeated errors in a loop (for over 60 seconds).
maximumErrorCountWhenWaitingForReplicationStatus = 60
)

var (
Expand Down Expand Up @@ -335,6 +340,14 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err != nil {
return fmt.Errorf("failed to initialize mysql config: %v", err)
}
ctx, cancelCtx := context.WithCancel(ctx)
backgroundCtx, cancelbackgroundCtx := context.WithCancel(backgroundCtx)
mysqld.OnFailure(func(err error) {
log.Warning("Cancelling the vtbackup context as MySQL has failed")
cancelCtx()
cancelbackgroundCtx()
})

initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout)
defer initCancel()
initMysqldAt := time.Now()
Expand Down Expand Up @@ -519,8 +532,14 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
statusErr error

waitStartTime = time.Now()

continuousErrorCount int
)
for {
if continuousErrorCount == maximumErrorCountWhenWaitingForReplicationStatus {
return fmt.Errorf("timeout waiting for replication status after %d errors", maximumErrorCountWhenWaitingForReplicationStatus)
}

select {
case <-ctx.Done():
return fmt.Errorf("error in replication catch up: %v", ctx.Err())
Expand All @@ -531,6 +550,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continuousErrorCount++
continue
}
if status.Position.AtLeast(primaryPos) {
Expand All @@ -553,7 +573,11 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
}
continuousErrorCount++
} else {
// Since replication is working if we got here, let's reset the error count to zero.
// This allows us to avoid failing if we only have transient errors from time to time.
continuousErrorCount = 0
phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0)
}
}
Expand Down
30 changes: 21 additions & 9 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ type Mysqld struct {
capabilities capabilitySet

// mutex protects the fields below.
mutex sync.Mutex
onTermFuncs []func()
cancelWaitCmd chan struct{}
mutex sync.Mutex
onTermFuncs []func()
onFailureFuncs []func(error)
cancelWaitCmd chan struct{}

semiSyncType mysql.SemiSyncType
}
Expand Down Expand Up @@ -445,6 +446,11 @@ func (mysqld *Mysqld) startNoWait(cnf *Mycnf, mysqldArgs ...string) error {
for _, callback := range mysqld.onTermFuncs {
go callback()
}
if err != nil {
for _, failureFunc := range mysqld.onFailureFuncs {
go failureFunc(err)
}
}
mysqld.mutex.Unlock()
}
}(mysqld.cancelWaitCmd)
Expand Down Expand Up @@ -609,12 +615,12 @@ func (mysqld *Mysqld) Shutdown(ctx context.Context, cnf *Mycnf, waitForMysqld bo

// We're shutting down on purpose. We no longer want to be notified when
// mysqld terminates.
mysqld.mutex.Lock()
if mysqld.cancelWaitCmd != nil {
close(mysqld.cancelWaitCmd)
mysqld.cancelWaitCmd = nil
}
mysqld.mutex.Unlock()
// mysqld.mutex.Lock()
// if mysqld.cancelWaitCmd != nil {
// close(mysqld.cancelWaitCmd)
// mysqld.cancelWaitCmd = nil
// }
// mysqld.mutex.Unlock()

// possibly mysql is already shutdown, check for a few files first
_, socketPathErr := os.Stat(cnf.SocketFile)
Expand Down Expand Up @@ -1247,6 +1253,12 @@ func (mysqld *Mysqld) OnTerm(f func()) {
mysqld.onTermFuncs = append(mysqld.onTermFuncs, f)
}

func (mysqld *Mysqld) OnFailure(f func(error)) {
mysqld.mutex.Lock()
defer mysqld.mutex.Unlock()
mysqld.onFailureFuncs = append(mysqld.onFailureFuncs, f)
}

func buildLdPaths() ([]string, error) {
vtMysqlRoot, err := vtenv.VtMysqlRoot()
if err != nil {
Expand Down

0 comments on commit ee310c8

Please sign in to comment.