diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 799d2e93c11..9c9403c948c 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -602,39 +602,52 @@ func (be *BuiltinBackupEngine) backupFiles( // Backup with the provided concurrency. sema := semaphore.NewWeighted(int64(params.Concurrency)) wg := sync.WaitGroup{} + + ctxCancel, cancel := context.WithCancel(ctx) + defer cancel() + for i := range fes { wg.Add(1) go func(i int) { defer wg.Done() fe := &fes[i] // Wait until we are ready to go, return if we encounter an error - acqErr := sema.Acquire(ctx, 1) + acqErr := sema.Acquire(ctxCancel, 1) if acqErr != nil { log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error()) bh.RecordError(acqErr) + cancel() return } defer sema.Release(1) + + // First check if we have any error, if we have, there is no point trying backing up this file. + // We check for errors before checking if the context is canceled on purpose, if there was an + // error, the context would have been canceled already. + if bh.HasErrors() { + params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) + return + } + // Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might // end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66, // which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces // unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check. select { - case <-ctx.Done(): + case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q backup", fe.Name) bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) return default: } - if bh.HasErrors() { - params.Logger.Infof("failed to backup files due to error.") - return - } - // Backup the individual file. name := fmt.Sprintf("%v", i) - bh.RecordError(be.backupFile(ctx, params, bh, fe, name)) + err := be.backupFile(ctxCancel, params, bh, fe, name) + if err != nil { + bh.RecordError(acqErr) + cancel() + } }(i) } @@ -766,7 +779,7 @@ func (bp *backupPipe) HashString() string { return hex.EncodeToString(bp.crc32.Sum(nil)) } -func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger, restore bool) { +func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool) { messageStr := "restoring " if !restore { messageStr = "backing up " @@ -775,6 +788,9 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger defer tick.Stop() for { select { + case <-ctx.Done(): + logger.Infof("Canceled %s of %q file", messageStr, bp.filename) + return case <-bp.done: logger.Infof("Completed %s %q", messageStr, bp.filename) return @@ -821,7 +837,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara } br := newBackupReader(fe.Name, fi.Size(), timedSource) - go br.ReportProgress(builtinBackupProgress, params.Logger, false /*restore*/) + go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/) // Open the destination file for writing, and a buffer. params.Logger.Infof("Backing up file: %v", fe.Name) @@ -1020,43 +1036,53 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP sema := semaphore.NewWeighted(int64(params.Concurrency)) rec := concurrency.AllErrorRecorder{} wg := sync.WaitGroup{} + + ctxCancel, cancel := context.WithCancel(ctx) + defer cancel() + for i := range fes { wg.Add(1) go func(i int) { defer wg.Done() fe := &fes[i] // Wait until we are ready to go, return if we encounter an error - acqErr := sema.Acquire(ctx, 1) + acqErr := sema.Acquire(ctxCancel, 1) if acqErr != nil { log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error()) rec.RecordError(acqErr) + cancel() return } defer sema.Release(1) + + // First check if we have any error, if we have, there is no point trying to restore this file. + // We check for errors before checking if the context is canceled on purpose, if there was an + // error, the context would have been canceled already. + if rec.HasErrors() { + params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) + return + } + // Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might // end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66, // which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces // unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check. select { - case <-ctx.Done(): + case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q restore", fe.Name) rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) return default: } - if rec.HasErrors() { - params.Logger.Infof("Failed to restore files due to error.") - return - } - fe.ParentPath = createdDir // And restore the file. name := fmt.Sprintf("%v", i) params.Logger.Infof("Copying file %v: %v", name, fe.Name) - err := be.restoreFile(ctx, params, bh, fe, bm, name) + err := be.restoreFile(ctxCancel, params, bh, fe, bm, name) if err != nil { rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name)) + cancel() } }(i) } @@ -1086,7 +1112,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa }() br := newBackupReader(name, 0, timedSource) - go br.ReportProgress(builtinBackupProgress, params.Logger, true /*restore*/) + go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true) var reader io.Reader = br // Open the destination file for writing.