From db70c6b3929f48bfa72442002977c126cb6b56f0 Mon Sep 17 00:00:00 2001 From: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Date: Mon, 30 Sep 2024 15:33:17 -0600 Subject: [PATCH 1/2] Fail fast when builtinbackup fails to restore a single file (#16856) Signed-off-by: Florent Poinsard Signed-off-by: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Co-authored-by: Matt Lord Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/builtinbackupengine.go | 64 +++++++++++++++++++-------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 94ed7bdee6a..4b9a0fa4269 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -611,39 +611,52 @@ func (be *BuiltinBackupEngine) backupFiles( // Backup with the provided concurrency. sema := semaphore.NewWeighted(int64(params.Concurrency)) wg := sync.WaitGroup{} + + ctxCancel, cancel := context.WithCancel(ctx) + defer cancel() + for i := range fes { wg.Add(1) go func(i int) { defer wg.Done() fe := &fes[i] // Wait until we are ready to go, return if we encounter an error - acqErr := sema.Acquire(ctx, 1) + acqErr := sema.Acquire(ctxCancel, 1) if acqErr != nil { log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error()) bh.RecordError(acqErr) + cancel() return } defer sema.Release(1) + + // First check if we have any error, if we have, there is no point trying backing up this file. + // We check for errors before checking if the context is canceled on purpose, if there was an + // error, the context would have been canceled already. + if bh.HasErrors() { + params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) + return + } + // Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might // end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66, // which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces // unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check. select { - case <-ctx.Done(): + case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q backup", fe.Name) bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) return default: } - if bh.HasErrors() { - params.Logger.Infof("failed to backup files due to error.") - return - } - // Backup the individual file. name := fmt.Sprintf("%v", i) - bh.RecordError(be.backupFile(ctx, params, bh, fe, name)) + err := be.backupFile(ctxCancel, params, bh, fe, name) + if err != nil { + bh.RecordError(acqErr) + cancel() + } }(i) } @@ -775,11 +788,14 @@ func (bp *backupPipe) HashString() string { return hex.EncodeToString(bp.crc32.Sum(nil)) } -func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger) { +func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger) { tick := time.NewTicker(period) defer tick.Stop() for { select { + case <-ctx.Done(): + logger.Infof("Canceled %q file", bp.filename) + return case <-bp.done: logger.Infof("Done taking Backup %q", bp.filename) return @@ -822,7 +838,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } br := newBackupReader(fe.Name, fi.Size(), timedSource) - go br.ReportProgress(builtinBackupProgress, params.Logger) + go br.ReportProgress(ctx, builtinBackupProgress, params.Logger) // Open the destination file for writing, and a buffer. params.Logger.Infof("Backing up file: %v", fe.Name) @@ -1021,43 +1037,53 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP sema := semaphore.NewWeighted(int64(params.Concurrency)) rec := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} + + ctxCancel, cancel := context.WithCancel(ctx) + defer cancel() + for i := range fes { wg.Add(1) go func(i int) { defer wg.Done() fe := &fes[i] // Wait until we are ready to go, return if we encounter an error - acqErr := sema.Acquire(ctx, 1) + acqErr := sema.Acquire(ctxCancel, 1) if acqErr != nil { log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error()) rec.RecordError(acqErr) + cancel() return } defer sema.Release(1) + + // First check if we have any error, if we have, there is no point trying to restore this file. + // We check for errors before checking if the context is canceled on purpose, if there was an + // error, the context would have been canceled already. + if rec.HasErrors() { + params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) + return + } + // Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might // end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66, // which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces // unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check. select { - case <-ctx.Done(): + case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q restore", fe.Name) rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) return default: } - if rec.HasErrors() { - params.Logger.Infof("Failed to restore files due to error.") - return - } - fe.ParentPath = createdDir // And restore the file. name := fmt.Sprintf("%v", i) params.Logger.Infof("Copying file %v: %v", name, fe.Name) - err := be.restoreFile(ctx, params, bh, fe, bm, name) + err := be.restoreFile(ctxCancel, params, bh, fe, bm, name) if err != nil { rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name)) + cancel() } }(i) } @@ -1087,7 +1113,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa }() br := newBackupReader(name, 0, timedSource) - go br.ReportProgress(builtinBackupProgress, params.Logger) + go br.ReportProgress(ctx, builtinBackupProgress, params.Logger) var reader io.Reader = br // Open the destination file for writing. From 7a930fff4881ac7902a23692634c85b00300ad14 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Wed, 2 Oct 2024 15:09:20 -0600 Subject: [PATCH 2/2] Fix tests Signed-off-by: Florent Poinsard --- go/vt/mysqlctl/backup_blackbox_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/mysqlctl/backup_blackbox_test.go b/go/vt/mysqlctl/backup_blackbox_test.go index 4508c4e4306..eafe34f7f07 100644 --- a/go/vt/mysqlctl/backup_blackbox_test.go +++ b/go/vt/mysqlctl/backup_blackbox_test.go @@ -402,7 +402,7 @@ func TestExecuteBackupWithCanceledContext(t *testing.T) { require.Error(t, err) // all four files will fail - require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled") + require.ErrorContains(t, err, "context canceled") assert.Equal(t, mysqlctl.BackupUnusable, backupResult) }