Skip to content

Commit

Permalink
Fall back to other decompression method if the current one fails
Browse files Browse the repository at this point in the history
The mitigation will be removed after identifying longhorn/longhorn#7687

Longhorn 7687

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Apr 17, 2024
1 parent 3a87ee0 commit 863e842
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 9 deletions.
2 changes: 1 addition & 1 deletion backupbackingimage/backupbackingimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func restoreBlockToFile(bsDriver backupstore.BackupStoreDriver, backingImageFile
return err
}
defer rc.Close()
r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum)
r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
return err
}
Expand Down
15 changes: 11 additions & 4 deletions deltablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc
},
}

log = logrus.WithFields(logrus.Fields{
"compressionMethod": volume.CompressionMethod,
})

// keep lock alive for async go routine.
if err := lock.Lock(); err != nil {
deltaOps.CloseSnapshot(snapshot.Name, volume.Name)
Expand All @@ -264,6 +268,7 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc
deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), 0, "", "")

log.Info("Performing delta block backup")

if progress, backup, err := performBackup(bsDriver, config, delta, deltaBackup, backupRequest.lastBackup); err != nil {
logrus.WithError(err).Errorf("Failed to perform backup for volume %v snapshot %v", volume.Name, snapshot.Name)
deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), progress, "", err.Error())
Expand Down Expand Up @@ -738,18 +743,20 @@ func restoreBlockToFile(bsDriver BackupStoreDriver, volumeName string, volDev *o
blkFile := getBlockFilePath(volumeName, blk.BlockChecksum)
rc, err := bsDriver.Read(blkFile)
if err != nil {
return err
return errors.Wrapf(err, "failed to read block %v from backupstore %v", blk.BlockChecksum, blkFile)
}
defer rc.Close()
r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum)

r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
return err
}

if _, err := volDev.Seek(blk.Offset, 0); err != nil {
return err
return errors.Wrapf(err, "failed to seek to offset %v for decompressed block %v", blk.Offset, blk.BlockChecksum)
}
_, err = io.CopyN(volDev, r, DEFAULT_BLOCK_SIZE)
return err
return errors.Wrapf(err, "failed to write decompressed block %v to volume %v", blk.BlockChecksum, volumeName)
}

func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRestoreConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ const (
// For lock mechanism, please refer to: https://github.com/longhorn/longhorn/blob/master/enhancements/20200701-backupstore-file-locks.md#proposal
// Currently the lock file is stored in each BackupVolume folder.
// For BackingImage Lock it is also stored there with the folder name "BACKINGIMAGE" defined here.
// To prevent Longhorn from accidently considering it as another normal BackupVolume,
// we use upppercase here so it will be filtered out when listing.
// To prevent Longhorn from accidentally considering it as another normal BackupVolume,
// we use uppercase here so it will be filtered out when listing.
BackupBackingImageLockName = "BACKINGIMAGE"
)

Expand Down
35 changes: 33 additions & 2 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,50 @@ func CompressData(method string, data []byte) (io.ReadSeeker, error) {
func DecompressAndVerify(method string, src io.Reader, checksum string) (io.Reader, error) {
r, err := newDecompressionReader(method, src)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create decompression reader")
}
defer r.Close()
block, err := io.ReadAll(r)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read decompressed data")
}
if GetChecksum(block) != checksum {
return nil, fmt.Errorf("checksum verification failed for block")
}
return bytes.NewReader(block), nil
}

func DecompressAndVerifyWithFallback(decompression string, rc io.ReadCloser, checksum string) (io.Reader, error) {
r, err := DecompressAndVerify(decompression, rc, checksum)
if err == nil {
return r, nil
}
// Fall back to other decompression method if the current one fails
// The mitigation will be removed after identifying https://github.com/longhorn/longhorn/issues/7687
// Seek rc to offset 0
seeker, ok := rc.(io.Seeker)
if !ok {
return nil, errors.Wrapf(err, "failed to cast to io.Seeker for block %v", checksum)
}

_, errFallback := seeker.Seek(0, io.SeekStart)
if errFallback != nil {
// Merge the err1 and err2 and error out
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to seek to offset 0 for block %v", checksum)
}

if strings.Contains(err.Error(), gzip.ErrHeader.Error()) {
r, errFallback = DecompressAndVerify("lz4", rc, checksum)
} else if strings.Contains(err.Error(), "lz4: bad magic number") {
r, errFallback = DecompressAndVerify("gzip", rc, checksum)
}
if errFallback != nil {
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to decompress and verify block %v with fallback", checksum)
}

return r, nil
}

func newCompressionWriter(method string, buffer io.Writer) (io.WriteCloser, error) {
switch method {
case "gzip":
Expand Down

0 comments on commit 863e842

Please sign in to comment.