Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] Fail fast when builtinbackup fails to restore a single file (#16856) #16867

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestExecuteBackupWithCanceledContext(t *testing.T) {

require.Error(t, err)
// all four files will fail
require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled")
require.ErrorContains(t, err, "context canceled")
assert.Equal(t, mysqlctl.BackupUnusable, backupResult)
}

Expand Down
64 changes: 45 additions & 19 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,39 +611,52 @@
// Backup with the provided concurrency.
sema := semaphore.NewWeighted(int64(params.Concurrency))
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying backing up this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if bh.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q backup", fe.Name)
bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if bh.HasErrors() {
params.Logger.Infof("failed to backup files due to error.")
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
bh.RecordError(be.backupFile(ctx, params, bh, fe, name))
err := be.backupFile(ctxCancel, params, bh, fe, name)
if err != nil {
bh.RecordError(acqErr)
cancel()

Check warning on line 658 in go/vt/mysqlctl/builtinbackupengine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/mysqlctl/builtinbackupengine.go#L657-L658

Added lines #L657 - L658 were not covered by tests
}
}(i)
}

Expand Down Expand Up @@ -775,11 +788,14 @@
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger) {
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %q file", bp.filename)
return

Check warning on line 798 in go/vt/mysqlctl/builtinbackupengine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/mysqlctl/builtinbackupengine.go#L796-L798

Added lines #L796 - L798 were not covered by tests
case <-bp.done:
logger.Infof("Done taking Backup %q", bp.filename)
return
Expand Down Expand Up @@ -822,7 +838,7 @@
}

br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
Expand Down Expand Up @@ -1021,43 +1037,53 @@
sema := semaphore.NewWeighted(int64(params.Concurrency))
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
rec.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if rec.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

fe.ParentPath = createdDir
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fe.Name)
err := be.restoreFile(ctx, params, bh, fe, bm, name)
err := be.restoreFile(ctxCancel, params, bh, fe, bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name))
cancel()

Check warning on line 1086 in go/vt/mysqlctl/builtinbackupengine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/mysqlctl/builtinbackupengine.go#L1086

Added line #L1086 was not covered by tests
}
}(i)
}
Expand Down Expand Up @@ -1087,7 +1113,7 @@
}()

br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down
Loading