Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fall back to other decompression method if the current one fails #167

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions backupbackingimage/backupbackingimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,11 @@ func restoreBlockToFile(bsDriver backupstore.BackupStoreDriver, backingImageFile
return err
}
defer rc.Close()
r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum)
r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
return err
if r == nil {
return err
}
}

if _, err := backingImageFile.Seek(blk.Offset, 0); err != nil {
Expand Down
19 changes: 14 additions & 5 deletions deltablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc
},
}

log = logrus.WithFields(logrus.Fields{
"compressionMethod": volume.CompressionMethod,
})

// keep lock alive for async go routine.
if err := lock.Lock(); err != nil {
deltaOps.CloseSnapshot(snapshot.Name, volume.Name)
Expand All @@ -264,6 +268,7 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc
deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), 0, "", "")

log.Info("Performing delta block backup")

if progress, backup, err := performBackup(bsDriver, config, delta, deltaBackup, backupRequest.lastBackup); err != nil {
logrus.WithError(err).Errorf("Failed to perform backup for volume %v snapshot %v", volume.Name, snapshot.Name)
deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), progress, "", err.Error())
Expand Down Expand Up @@ -738,18 +743,22 @@ func restoreBlockToFile(bsDriver BackupStoreDriver, volumeName string, volDev *o
blkFile := getBlockFilePath(volumeName, blk.BlockChecksum)
rc, err := bsDriver.Read(blkFile)
if err != nil {
return err
return errors.Wrapf(err, "failed to read block %v with checksum %v", blkFile, blk.BlockChecksum)
}
defer rc.Close()
r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum)

r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
return err
if r == nil {
return err
}
}

if _, err := volDev.Seek(blk.Offset, 0); err != nil {
return err
return errors.Wrapf(err, "failed to seek to offset %v for decompressed block %v", blk.Offset, blkFile)
}
_, err = io.CopyN(volDev, r, DEFAULT_BLOCK_SIZE)
return err
return errors.Wrapf(err, "failed to write decompressed block %v to volume %v", blkFile, volumeName)
}

func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRestoreConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ const (
// For lock mechanism, please refer to: https://github.com/longhorn/longhorn/blob/master/enhancements/20200701-backupstore-file-locks.md#proposal
// Currently the lock file is stored in each BackupVolume folder.
// For BackingImage Lock it is also stored there with the folder name "BACKINGIMAGE" defined here.
// To prevent Longhorn from accidently considering it as another normal BackupVolume,
// we use upppercase here so it will be filtered out when listing.
// To prevent Longhorn from accidentally considering it as another normal BackupVolume,
// we use uppercase here so it will be filtered out when listing.
BackupBackingImageLockName = "BACKINGIMAGE"
)

Expand Down
37 changes: 35 additions & 2 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,52 @@ func CompressData(method string, data []byte) (io.ReadSeeker, error) {
func DecompressAndVerify(method string, src io.Reader, checksum string) (io.Reader, error) {
r, err := newDecompressionReader(method, src)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create decompression reader")
}
defer r.Close()
block, err := io.ReadAll(r)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read decompressed data")
}
if GetChecksum(block) != checksum {
return nil, fmt.Errorf("checksum verification failed for block")
}
return bytes.NewReader(block), nil
}

// DecompressAndVerifyWithFallback decompresses the given data and verifies the data integrity.
// If the decompression fails, it will try to decompress with the fallback method.
func DecompressAndVerifyWithFallback(decompression string, rc io.ReadCloser, checksum string) (io.Reader, error) {
derekbit marked this conversation as resolved.
Show resolved Hide resolved
r, err := DecompressAndVerify(decompression, rc, checksum)
if err == nil {
return r, nil
}
// Fall back to other decompression method if the current one fails
// The mitigation will be removed after identifying https://github.com/longhorn/longhorn/issues/7687
// Seek rc to offset 0
seeker, ok := rc.(io.Seeker)
if !ok {
return nil, errors.Wrapf(err, "failed to cast to io.Seeker for block %v", checksum)
}

_, errFallback := seeker.Seek(0, io.SeekStart)
if errFallback != nil {
// Merge the err1 and err2 and error out
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to seek to offset 0 for block %v", checksum)
}

if strings.Contains(err.Error(), gzip.ErrHeader.Error()) {
r, errFallback = DecompressAndVerify("lz4", rc, checksum)
} else if strings.Contains(err.Error(), "lz4: bad magic number") {
r, errFallback = DecompressAndVerify("gzip", rc, checksum)
}
if errFallback != nil {
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to decompress and verify block %v with fallback", checksum)
}

return r, err
}

func newCompressionWriter(method string, buffer io.Writer) (io.WriteCloser, error) {
switch method {
case "gzip":
Expand Down