Skip to content

Commit

Permalink
[release-18.0] Fail fast when builtinbackup fails to restore a single…
Browse files Browse the repository at this point in the history
… file (#16856) (#16866)

Signed-off-by: Florent Poinsard <[email protected]>
Signed-off-by: Florent Poinsard <[email protected]>
Co-authored-by: Florent Poinsard <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: Florent Poinsard <[email protected]>
  • Loading branch information
4 people authored Oct 4, 2024
1 parent 2475b12 commit ce945a3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func TestExecuteBackupWithCanceledContext(t *testing.T) {

require.Error(t, err)
// all four files will fail
require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled")
require.ErrorContains(t, err, "context canceled")
assert.False(t, ok)
}

Expand Down
64 changes: 45 additions & 19 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,39 +598,52 @@ func (be *BuiltinBackupEngine) backupFiles(
// Backup with the provided concurrency.
sema := semaphore.NewWeighted(int64(params.Concurrency))
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying backing up this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if bh.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q backup", fe.Name)
bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if bh.HasErrors() {
params.Logger.Infof("failed to backup files due to error.")
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
bh.RecordError(be.backupFile(ctx, params, bh, fe, name))
err := be.backupFile(ctxCancel, params, bh, fe, name)
if err != nil {
bh.RecordError(acqErr)
cancel()
}
}(i)
}

Expand Down Expand Up @@ -761,11 +774,14 @@ func (bp *backupPipe) HashString() string {
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger) {
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %q file", bp.filename)
return
case <-bp.done:
logger.Infof("Done taking Backup %q", bp.filename)
return
Expand Down Expand Up @@ -808,7 +824,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}

br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
Expand Down Expand Up @@ -1007,43 +1023,53 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
sema := semaphore.NewWeighted(int64(params.Concurrency))
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
rec.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if rec.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

fe.ParentPath = createdDir
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fe.Name)
err := be.restoreFile(ctx, params, bh, fe, bm, name)
err := be.restoreFile(ctxCancel, params, bh, fe, bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name))
cancel()
}
}(i)
}
Expand Down Expand Up @@ -1073,7 +1099,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
}()

br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down

0 comments on commit ce945a3

Please sign in to comment.