Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail VTBackup early when replication or MySQL is failing #17356

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const (
phaseNameTakeNewBackup = "TakeNewBackup"
phaseStatusCatchupReplicationStalled = "Stalled"
phaseStatusCatchupReplicationStopped = "Stopped"

// We will allow maximum 60 errors in a row when waiting for replication status before taking the new backup.
// As we try every second, this is equivalent to minimum 1 minute of continuously erroring before failing.
// It allows us to ignore transient errors while avoiding repeated errors in a loop (for over 60 seconds).
maximumErrorCountWhenWaitingForReplicationStatus = 60
)

var (
Expand Down Expand Up @@ -335,6 +340,14 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err != nil {
return fmt.Errorf("failed to initialize mysql config: %v", err)
}
ctx, cancelCtx := context.WithCancel(ctx)
backgroundCtx, cancelbackgroundCtx := context.WithCancel(backgroundCtx)
Comment on lines +343 to +344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Matt mentions below, the context should always be cancelled.

Suggested change
ctx, cancelCtx := context.WithCancel(ctx)
backgroundCtx, cancelbackgroundCtx := context.WithCancel(backgroundCtx)
ctx, cancelCtx := context.WithCancel(ctx)
defer cancelCtx()
backgroundCtx, cancelbackgroundCtx := context.WithCancel(backgroundCtx)
defer cancelbackgroundCtx()

mysqld.OnFailure(func(err error) {
log.Warning("Cancelling the vtbackup context as MySQL has failed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Warning("Cancelling the vtbackup context as MySQL has failed")
log.Warningf("Cancelling the vtbackup context as MySQL has failed: %v", err)

cancelCtx()
cancelbackgroundCtx()
})
Comment on lines +345 to +349
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should normally call the context cancel functions no matter what. And given that this does nothing with the error... I wonder if we can't just add an OnTerm function for this. Isn't that all we care about, stopping vtbackup when the mysqld process ends?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message would then be:

log.Warning("Cancelling the vtbackup context as MySQL has terminated")


initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout)
defer initCancel()
initMysqldAt := time.Now()
Expand Down Expand Up @@ -519,8 +532,14 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
statusErr error

waitStartTime = time.Now()

continuousErrorCount int
)
for {
if continuousErrorCount == maximumErrorCountWhenWaitingForReplicationStatus {
return fmt.Errorf("timeout waiting for replication status after %d errors", maximumErrorCountWhenWaitingForReplicationStatus)
}

select {
case <-ctx.Done():
return fmt.Errorf("error in replication catch up: %v", ctx.Err())
Expand All @@ -531,6 +550,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continuousErrorCount++
continue
}
if status.Position.AtLeast(primaryPos) {
Expand All @@ -553,7 +573,11 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
}
continuousErrorCount++
} else {
// Since replication is working if we got here, let's reset the error count to zero.
// This allows us to avoid failing if we only have transient errors from time to time.
continuousErrorCount = 0
phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0)
}
}
Expand Down
80 changes: 69 additions & 11 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,48 @@ var (
) Engine=InnoDB;`
)

func TestFailingReplication(t *testing.T) {
prepareCluster(t)

// Run the entire backup test
firstBackupTest(t, false)

// Insert one more row, the primary will be ahead of the last backup
_, err := primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test_failure')", keyspaceName, true)
require.NoError(t, err)

// Disable replication from the primary by removing the grants to 'vt_repl'.
_, err = primary.VttabletProcess.QueryTablet("REVOKE REPLICATION SLAVE ON *.* FROM 'vt_repl'@'%';", keyspaceName, true)
require.NoError(t, err)
_, err = primary.VttabletProcess.QueryTablet("FLUSH PRIVILEGES;", keyspaceName, true)
require.NoError(t, err)

// Take a backup with vtbackup: the process should fail entirely as it cannot replicate from the primary.
_, err = startVtBackup(t, false, false, false)
require.Error(t, err)

// keep in mind how many backups we have right now
backups, err := listBackups(shardKsName)
require.NoError(t, err)

// In 30 seconds, grant the replication permission again to 'vt_repl'.
// This will mean that vtbackup should fail to replicate for ~30 seconds, until we grant the permission again.
go func() {
<-time.After(30 * time.Second)
_, err = primary.VttabletProcess.QueryTablet("GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';", keyspaceName, true)
require.NoError(t, err)
_, err = primary.VttabletProcess.QueryTablet("FLUSH PRIVILEGES;", keyspaceName, true)
require.NoError(t, err)
}()

// this will initially be stuck trying to replicate from the primary, and once we re-grant the permission in
// the goroutine above, the process will work and complete successfully.
_ = vtBackup(t, false, false, false)
verifyBackupCount(t, shardKsName, len(backups)+1)

tearDown(t, true)
}

func TestTabletInitialBackup(t *testing.T) {
// Test Initial Backup Flow
// TestTabletInitialBackup will:
Expand All @@ -59,6 +101,15 @@ func TestTabletInitialBackup(t *testing.T) {
// - Bring up a second replica, and restore from the second backup
// - list the backups, remove them

prepareCluster(t)

// Run the entire backup test
firstBackupTest(t, true)

tearDown(t, true)
}

func prepareCluster(t *testing.T) {
waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})

dataPointReader := vtBackup(t, true, false, false)
Expand All @@ -84,11 +135,6 @@ func TestTabletInitialBackup(t *testing.T) {
"TabletExternallyReparented", primary.Alias)
require.NoError(t, err)
restore(t, replica1, "replica", "SERVING")

// Run the entire backup test
firstBackupTest(t, "replica")

tearDown(t, true)
}

func TestTabletBackupOnly(t *testing.T) {
Expand All @@ -107,12 +153,12 @@ func TestTabletBackupOnly(t *testing.T) {
replica1.VttabletProcess.ServingStatus = "NOT_SERVING"

initTablets(t, true, true)
firstBackupTest(t, "replica")
firstBackupTest(t, true)

tearDown(t, false)
}

func firstBackupTest(t *testing.T, tabletType string) {
func firstBackupTest(t *testing.T, removeBackup bool) {
// Test First Backup flow.
//
// firstBackupTest will:
Expand Down Expand Up @@ -168,11 +214,13 @@ func firstBackupTest(t *testing.T, tabletType string) {
// check the new replica has the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)

removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
if removeBackup {
removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
}
}

func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
func startVtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) (*os.File, error) {
mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock")
require.NoError(t, err)
defer os.Remove(mysqlSocket.Name())
Expand Down Expand Up @@ -207,9 +255,19 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo

log.Infof("starting backup tablet %s", time.Now())
err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...)
require.NoError(t, err)
if err != nil {
return nil, err
}

f, err := os.OpenFile(statsPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
return f, nil
}

func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
f, err := startVtBackup(t, initialBackup, restartBeforeBackup, disableRedoLog)
require.NoError(t, err)
return opentsdb.NewDataPointReader(f)
}
Expand Down
18 changes: 15 additions & 3 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ type Mysqld struct {
capabilities capabilitySet

// mutex protects the fields below.
mutex sync.Mutex
onTermFuncs []func()
cancelWaitCmd chan struct{}
mutex sync.Mutex
onTermFuncs []func()
onFailureFuncs []func(error)
cancelWaitCmd chan struct{}

semiSyncType mysql.SemiSyncType
}
Expand Down Expand Up @@ -445,6 +446,11 @@ func (mysqld *Mysqld) startNoWait(cnf *Mycnf, mysqldArgs ...string) error {
for _, callback := range mysqld.onTermFuncs {
go callback()
}
if err != nil {
for _, failureFunc := range mysqld.onFailureFuncs {
go failureFunc(err)
}
}
mysqld.mutex.Unlock()
}
}(mysqld.cancelWaitCmd)
Expand Down Expand Up @@ -1247,6 +1253,12 @@ func (mysqld *Mysqld) OnTerm(f func()) {
mysqld.onTermFuncs = append(mysqld.onTermFuncs, f)
}

func (mysqld *Mysqld) OnFailure(f func(error)) {
mysqld.mutex.Lock()
defer mysqld.mutex.Unlock()
mysqld.onFailureFuncs = append(mysqld.onFailureFuncs, f)
}

func buildLdPaths() ([]string, error) {
vtMysqlRoot, err := vtenv.VtMysqlRoot()
if err != nil {
Expand Down
Loading