Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] Fail fast when builtinbackup fails to restore a single file (#16856) #16867

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,39 +611,52 @@
// Backup with the provided concurrency.
sema := semaphore.NewWeighted(int64(params.Concurrency))
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying backing up this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if bh.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return

Check warning on line 638 in go/vt/mysqlctl/builtinbackupengine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/mysqlctl/builtinbackupengine.go#L637-L638

Added lines #L637 - L638 were not covered by tests
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q backup", fe.Name)
bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if bh.HasErrors() {
params.Logger.Infof("failed to backup files due to error.")
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
bh.RecordError(be.backupFile(ctx, params, bh, fe, name))
err := be.backupFile(ctxCancel, params, bh, fe, name)
if err != nil {
bh.RecordError(acqErr)
cancel()

Check warning on line 658 in go/vt/mysqlctl/builtinbackupengine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/mysqlctl/builtinbackupengine.go#L657-L658

Added lines #L657 - L658 were not covered by tests
}
}(i)
}

Expand Down Expand Up @@ -775,11 +788,14 @@
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger) {
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %q file", bp.filename)
return
case <-bp.done:
logger.Infof("Done taking Backup %q", bp.filename)
return
Expand Down Expand Up @@ -822,7 +838,7 @@
}

br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
Expand Down Expand Up @@ -1021,43 +1037,53 @@
sema := semaphore.NewWeighted(int64(params.Concurrency))
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
rec.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if rec.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

fe.ParentPath = createdDir
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fe.Name)
err := be.restoreFile(ctx, params, bh, fe, bm, name)
err := be.restoreFile(ctxCancel, params, bh, fe, bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name))
cancel()

Check warning on line 1086 in go/vt/mysqlctl/builtinbackupengine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/mysqlctl/builtinbackupengine.go#L1086

Added line #L1086 was not covered by tests
}
}(i)
}
Expand Down Expand Up @@ -1087,7 +1113,7 @@
}()

br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down
Loading