From 46a46f1bb4fd02f89171cb35815c69c5c19e4862 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Wed, 17 Apr 2024 06:44:05 +0000 Subject: [PATCH] Fall back to other decompression method if the current one fails The mitigation will be removed after identifying longhorn/longhorn#7687 Longhorn 7687 Signed-off-by: Derek Su --- backupbackingimage/backupbackingimage.go | 6 +- deltablock.go | 19 +++-- types/types.go | 4 +- util/util.go | 37 +++++++++- util/util_test.go | 88 ++++++++++++++++++++++++ 5 files changed, 143 insertions(+), 11 deletions(-) diff --git a/backupbackingimage/backupbackingimage.go b/backupbackingimage/backupbackingimage.go index 6e10b0152..672b17ff6 100644 --- a/backupbackingimage/backupbackingimage.go +++ b/backupbackingimage/backupbackingimage.go @@ -482,9 +482,11 @@ func restoreBlockToFile(bsDriver backupstore.BackupStoreDriver, backingImageFile return err } defer rc.Close() - r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum) + r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum) if err != nil { - return err + if r == nil { + return err + } } if _, err := backingImageFile.Seek(blk.Offset, 0); err != nil { diff --git a/deltablock.go b/deltablock.go index 73851e53b..7c2ace7be 100644 --- a/deltablock.go +++ b/deltablock.go @@ -252,6 +252,10 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc }, } + log = logrus.WithFields(logrus.Fields{ + "compressionMethod": volume.CompressionMethod, + }) + // keep lock alive for async go routine. if err := lock.Lock(); err != nil { deltaOps.CloseSnapshot(snapshot.Name, volume.Name) @@ -264,6 +268,7 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), 0, "", "") log.Info("Performing delta block backup") + if progress, backup, err := performBackup(bsDriver, config, delta, deltaBackup, backupRequest.lastBackup); err != nil { logrus.WithError(err).Errorf("Failed to perform backup for volume %v snapshot %v", volume.Name, snapshot.Name) deltaOps.UpdateBackupStatus(snapshot.Name, volume.Name, string(types.ProgressStateInProgress), progress, "", err.Error()) @@ -738,18 +743,22 @@ func restoreBlockToFile(bsDriver BackupStoreDriver, volumeName string, volDev *o blkFile := getBlockFilePath(volumeName, blk.BlockChecksum) rc, err := bsDriver.Read(blkFile) if err != nil { - return err + return errors.Wrapf(err, "failed to read block %v with checksum %v", blkFile, blk.BlockChecksum) } defer rc.Close() - r, err := util.DecompressAndVerify(decompression, rc, blk.BlockChecksum) + + r, err := util.DecompressAndVerifyWithFallback(decompression, rc, blk.BlockChecksum) if err != nil { - return err + if r == nil { + return err + } } + if _, err := volDev.Seek(blk.Offset, 0); err != nil { - return err + return errors.Wrapf(err, "failed to seek to offset %v for decompressed block %v", blk.Offset, blkFile) } _, err = io.CopyN(volDev, r, DEFAULT_BLOCK_SIZE) - return err + return errors.Wrapf(err, "failed to write decompressed block %v to volume %v", blkFile, volumeName) } func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRestoreConfig) error { diff --git a/types/types.go b/types/types.go index bc626cc5f..471460edc 100644 --- a/types/types.go +++ b/types/types.go @@ -57,8 +57,8 @@ const ( // For lock mechanism, please refer to: https://github.com/longhorn/longhorn/blob/master/enhancements/20200701-backupstore-file-locks.md#proposal // Currently the lock file is stored in each BackupVolume folder. // For BackingImage Lock it is also stored there with the folder name "BACKINGIMAGE" defined here. - // To prevent Longhorn from accidently considering it as another normal BackupVolume, - // we use upppercase here so it will be filtered out when listing. + // To prevent Longhorn from accidentally considering it as another normal BackupVolume, + // we use uppercase here so it will be filtered out when listing. BackupBackingImageLockName = "BACKINGIMAGE" ) diff --git a/util/util.go b/util/util.go index 7f066f8ed..394050110 100644 --- a/util/util.go +++ b/util/util.go @@ -120,12 +120,12 @@ func CompressData(method string, data []byte) (io.ReadSeeker, error) { func DecompressAndVerify(method string, src io.Reader, checksum string) (io.Reader, error) { r, err := newDecompressionReader(method, src) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create decompression reader") } defer r.Close() block, err := io.ReadAll(r) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to read decompressed data") } if GetChecksum(block) != checksum { return nil, fmt.Errorf("checksum verification failed for block") @@ -133,6 +133,39 @@ func DecompressAndVerify(method string, src io.Reader, checksum string) (io.Read return bytes.NewReader(block), nil } +// DecompressAndVerifyWithFallback decompresses the given data and verifies the data integrity. +// If the decompression fails, it will try to decompress with the fallback method. +func DecompressAndVerifyWithFallback(decompression string, rc io.ReadCloser, checksum string) (io.Reader, error) { + r, err := DecompressAndVerify(decompression, rc, checksum) + if err == nil { + return r, nil + } + // Fall back to other decompression method if the current one fails + // The mitigation will be removed after identifying https://github.com/longhorn/longhorn/issues/7687 + // Seek rc to offset 0 + seeker, ok := rc.(io.Seeker) + if !ok { + return nil, errors.Wrapf(err, "failed to cast to io.Seeker for block %v", checksum) + } + + _, errFallback := seeker.Seek(0, io.SeekStart) + if errFallback != nil { + // Merge the err1 and err2 and error out + return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to seek to offset 0 for block %v", checksum) + } + + if strings.Contains(err.Error(), gzip.ErrHeader.Error()) { + r, errFallback = DecompressAndVerify("lz4", rc, checksum) + } else if strings.Contains(err.Error(), "lz4: bad magic number") { + r, errFallback = DecompressAndVerify("gzip", rc, checksum) + } + if errFallback != nil { + return nil, errors.Wrapf(multierr.Append(err, errFallback), "failed to decompress and verify block %v with fallback", checksum) + } + + return r, err +} + func newCompressionWriter(method string, buffer io.Writer) (io.WriteCloser, error) { switch method { case "gzip": diff --git a/util/util_test.go b/util/util_test.go index ac61a49ff..86cbfbc43 100644 --- a/util/util_test.go +++ b/util/util_test.go @@ -4,6 +4,7 @@ import ( "io" "math/rand" "net/url" + "os" "os/exec" "path/filepath" "strconv" @@ -78,6 +79,93 @@ func (s *TestSuite) TestCompress(c *C) { } } +type ReadSeekerCloser struct { + seeker io.ReadSeeker +} + +func (r *ReadSeekerCloser) Read(p []byte) (n int, err error) { + return r.seeker.Read(p) +} + +func (r *ReadSeekerCloser) Close() error { + return nil +} + +func NewReadSeekerCloser(seeker io.ReadSeeker) io.ReadCloser { + return &ReadSeekerCloser{seeker: seeker} +} + +func (s *TestSuite) TestCompressFallback(c *C) { + testsets := [][]string{ + {"gzip", "lz4"}, + {"lz4", "gzip"}, + } + + for _, testset := range testsets { + data := []byte("Some random string") + checksum := GetChecksum(data) + + compressedData, err := compressAndSave(testset[0], data) + c.Assert(err, IsNil) + defer cleanupFile(compressedData.tmpFile, c) + + decompressedData, err := decompressAndVerify(testset[1], compressedData.tmpFile, string(checksum)) + c.Assert(err, IsNil) + + result, err := io.ReadAll(decompressedData) + c.Assert(err, IsNil) + + c.Assert(result, DeepEquals, data) + } +} + +// Compresses data and saves it to a temporary file +func compressAndSave(compressionType string, data []byte) (*compressedData, error) { + compressed, err := CompressData(compressionType, data) + if err != nil { + return nil, err + } + + tmpFile := filepath.Join(testRoot, GenerateRandString()) + f, err := os.Create(tmpFile) + if err != nil { + return nil, err + } + + _, err = io.Copy(f, compressed) + if err != nil { + f.Close() + os.Remove(tmpFile) + return nil, err + } + + return &compressedData{file: f, tmpFile: tmpFile}, nil +} + +// Decompresses and verifies data from a file +func decompressAndVerify(decompressionType, file string, checksum string) (io.Reader, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + defer f.Close() + + rc := NewReadSeekerCloser(f) + return DecompressAndVerifyWithFallback(decompressionType, rc, string(checksum)) +} + +// Struct to hold compressed data and temporary file information +type compressedData struct { + file *os.File + tmpFile string +} + +// Cleans up temporary files +func cleanupFile(tmpFile string, c *C) { + err := os.Remove(tmpFile) + c.Assert(err, IsNil) +} + func GenerateRandString() string { r := make([]rune, nameLength) r[0] = firstLetters[rand.Intn(len(firstLetters))]