Skip to content

Commit

Permalink
Fall back to other decompression method if the current one fails
Browse files Browse the repository at this point in the history
The mitigation will be removed after identifying longhorn/longhorn#7687

Longhorn 7687

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Apr 17, 2024
1 parent 3a87ee0 commit 46a46f1
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 11 deletions.
6 changes: 4 additions & 2 deletions backupbackingimage/backupbackingimage.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,11 @@ func restoreBlockToFile(bsDriver backupstore.BackupStoreDriver, backingImageFile
return err
}
defer rc.Close()
r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum)
r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
return err
if r == nil {
return err
}
}

if _, err := backingImageFile.Seek(blk.Offset, 0); err != nil {
Expand Down
19 changes: 14 additions & 5 deletions deltablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc
},
}

log = logrus.WithFields(logrus.Fields{
"compressionMethod": volume.CompressionMethod,
})

// keep lock alive for async go routine.
if err := lock.Lock(); err != nil {
deltaOps.CloseSnapshot(snapshot.Name, volume.Name)
Expand All @@ -264,6 +268,7 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc
deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), 0, "", "")

log.Info("Performing delta block backup")

if progress, backup, err := performBackup(bsDriver, config, delta, deltaBackup, backupRequest.lastBackup); err != nil {
logrus.WithError(err).Errorf("Failed to perform backup for volume %v snapshot %v", volume.Name, snapshot.Name)
deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), progress, "", err.Error())
Expand Down Expand Up @@ -738,18 +743,22 @@ func restoreBlockToFile(bsDriver BackupStoreDriver, volumeName string, volDev *o
blkFile := getBlockFilePath(volumeName, blk.BlockChecksum)
rc, err := bsDriver.Read(blkFile)
if err != nil {
return err
return errors.Wrapf(err, "failed to read block %v with checksum %v", blkFile, blk.BlockChecksum)
}
defer rc.Close()
r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum)

r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum)
if err != nil {
return err
if r == nil {
return err
}
}

if _, err := volDev.Seek(blk.Offset, 0); err != nil {
return err
return errors.Wrapf(err, "failed to seek to offset %v for decompressed block %v", blk.Offset, blkFile)
}
_, err = io.CopyN(volDev, r, DEFAULT_BLOCK_SIZE)
return err
return errors.Wrapf(err, "failed to write decompressed block %v to volume %v", blkFile, volumeName)
}

func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRestoreConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ const (
// For lock mechanism, please refer to: https://github.com/longhorn/longhorn/blob/master/enhancements/20200701-backupstore-file-locks.md#proposal
// Currently the lock file is stored in each BackupVolume folder.
// For BackingImage Lock it is also stored there with the folder name "BACKINGIMAGE" defined here.
// To prevent Longhorn from accidently considering it as another normal BackupVolume,
// we use upppercase here so it will be filtered out when listing.
// To prevent Longhorn from accidentally considering it as another normal BackupVolume,
// we use uppercase here so it will be filtered out when listing.
BackupBackingImageLockName = "BACKINGIMAGE"
)

Expand Down
37 changes: 35 additions & 2 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,52 @@ func CompressData(method string, data []byte) (io.ReadSeeker, error) {
func DecompressAndVerify(method string, src io.Reader, checksum string) (io.Reader, error) {
r, err := newDecompressionReader(method, src)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create decompression reader")
}
defer r.Close()
block, err := io.ReadAll(r)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read decompressed data")
}
if GetChecksum(block) != checksum {
return nil, fmt.Errorf("checksum verification failed for block")
}
return bytes.NewReader(block), nil
}

// DecompressAndVerifyWithFallback decompresses the given data and verifies the data integrity.
// If the decompression fails, it will try to decompress with the fallback method.
func DecompressAndVerifyWithFallback(decompression string, rc io.ReadCloser, checksum string) (io.Reader, error) {
r, err := DecompressAndVerify(decompression, rc, checksum)
if err == nil {
return r, nil
}
// Fall back to other decompression method if the current one fails
// The mitigation will be removed after identifying https://github.com/longhorn/longhorn/issues/7687
// Seek rc to offset 0
seeker, ok := rc.(io.Seeker)
if !ok {
return nil, errors.Wrapf(err, "failed to cast to io.Seeker for block %v", checksum)
}

_, errFallback := seeker.Seek(0, io.SeekStart)
if errFallback != nil {
// Merge the err1 and err2 and error out
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to seek to offset 0 for block %v", checksum)
}

if strings.Contains(err.Error(), gzip.ErrHeader.Error()) {
r, errFallback = DecompressAndVerify("lz4", rc, checksum)
} else if strings.Contains(err.Error(), "lz4: bad magic number") {
r, errFallback = DecompressAndVerify("gzip", rc, checksum)
}
if errFallback != nil {
return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to decompress and verify block %v with fallback", checksum)
}

return r, err
}

func newCompressionWriter(method string, buffer io.Writer) (io.WriteCloser, error) {
switch method {
case "gzip":
Expand Down
88 changes: 88 additions & 0 deletions util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"math/rand"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -78,6 +79,93 @@ func (s *TestSuite) TestCompress(c *C) {
}
}

type ReadSeekerCloser struct {
seeker io.ReadSeeker
}

func (r *ReadSeekerCloser) Read(p []byte) (n int, err error) {
return r.seeker.Read(p)
}

func (r *ReadSeekerCloser) Close() error {
return nil
}

func NewReadSeekerCloser(seeker io.ReadSeeker) io.ReadCloser {
return &ReadSeekerCloser{seeker: seeker}
}

func (s *TestSuite) TestCompressFallback(c *C) {
testsets := [][]string{
{"gzip", "lz4"},
{"lz4", "gzip"},
}

for _, testset := range testsets {
data := []byte("Some random string")
checksum := GetChecksum(data)

compressedData, err := compressAndSave(testset[0], data)
c.Assert(err, IsNil)
defer cleanupFile(compressedData.tmpFile, c)

decompressedData, err := decompressAndVerify(testset[1], compressedData.tmpFile, string(checksum))
c.Assert(err, IsNil)

result, err := io.ReadAll(decompressedData)
c.Assert(err, IsNil)

c.Assert(result, DeepEquals, data)
}
}

// Compresses data and saves it to a temporary file
func compressAndSave(compressionType string, data []byte) (*compressedData, error) {
compressed, err := CompressData(compressionType, data)
if err != nil {
return nil, err
}

tmpFile := filepath.Join(testRoot, GenerateRandString())
f, err := os.Create(tmpFile)
if err != nil {
return nil, err
}

_, err = io.Copy(f, compressed)
if err != nil {
f.Close()
os.Remove(tmpFile)
return nil, err
}

return &compressedData{file: f, tmpFile: tmpFile}, nil
}

// Decompresses and verifies data from a file
func decompressAndVerify(decompressionType, file string, checksum string) (io.Reader, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()

rc := NewReadSeekerCloser(f)
return DecompressAndVerifyWithFallback(decompressionType, rc, string(checksum))
}

// Struct to hold compressed data and temporary file information
type compressedData struct {
file *os.File
tmpFile string
}

// Cleans up temporary files
func cleanupFile(tmpFile string, c *C) {
err := os.Remove(tmpFile)
c.Assert(err, IsNil)
}

func GenerateRandString() string {
r := make([]rune, nameLength)
r[0] = firstLetters[rand.Intn(len(firstLetters))]
Expand Down

0 comments on commit 46a46f1

Please sign in to comment.