From e68bf66df943625cd37ce0eaf4c01471f0842eed Mon Sep 17 00:00:00 2001 From: ostempel Date: Tue, 3 Dec 2024 13:51:58 +0100 Subject: [PATCH] fix decrompression of archives --- cmd/internal/backup/backup.go | 10 ++- cmd/internal/compress/compress.go | 86 +++++++++++++++++++------ cmd/internal/encryption/encryption.go | 5 -- cmd/internal/initializer/initializer.go | 9 ++- cmd/main.go | 36 +++++++---- 5 files changed, 104 insertions(+), 42 deletions(-) diff --git a/cmd/internal/backup/backup.go b/cmd/internal/backup/backup.go index 10c182f..faf61ea 100644 --- a/cmd/internal/backup/backup.go +++ b/cmd/internal/backup/backup.go @@ -114,11 +114,19 @@ func (b *Backuper) CreateBackup(ctx context.Context) error { defer writer1.Close() defer close(compressErr) - err := b.comp.Compress(ctx, backupFilePath, writer1) + files, err := b.comp.BuildFilesForCompression(constants.BackupDir, backupArchiveName) + if err != nil { + b.metrics.CountError("build_files") + compressErr <- err + return + } + + err = b.comp.Compress(ctx, writer1, files) if err != nil { b.metrics.CountError("compress") b.log.Error("error compressing backup", "error", err) compressErr <- err + return } else { compressErr <- nil } diff --git a/cmd/internal/compress/compress.go b/cmd/internal/compress/compress.go index f85e965..6931ae0 100644 --- a/cmd/internal/compress/compress.go +++ b/cmd/internal/compress/compress.go @@ -4,66 +4,112 @@ import ( "context" "fmt" "io" + "io/fs" - "github.com/metal-stack/backup-restore-sidecar/pkg/constants" "github.com/mholt/archives" + "github.com/spf13/afero" ) type Compressor struct { + fs afero.Fs compressor *archives.CompressedArchive extension string } +type CompressorConfig struct { + Method string + FS afero.Fs +} + // New Returns a new Compressor -func New(method string) (*Compressor, error) { - c := &archives.CompressedArchive{} - switch method { +func New(config *CompressorConfig) (*Compressor, error) { + + if config.FS == nil { + config.FS = afero.NewOsFs() + } + + var c *archives.CompressedArchive + switch config.Method { case "tar": c = &archives.CompressedArchive{ - Archival: archives.Tar{}, + Archival: archives.Tar{}, + Extraction: archives.Tar{}, } case "targz": c = &archives.CompressedArchive{ Compression: archives.Gz{}, + Extraction: archives.Tar{}, Archival: archives.Tar{}, } case "tarlz4": c = &archives.CompressedArchive{ Compression: archives.Lz4{}, + Extraction: archives.Tar{}, Archival: archives.Tar{}, } default: - return nil, fmt.Errorf("unsupported compression method: %s", method) + return nil, fmt.Errorf("unsupported compression method: %s", config.Method) } - return &Compressor{compressor: c, extension: "." + method}, nil + return &Compressor{ + compressor: c, + extension: "." + config.Method, + fs: config.FS, + }, nil } // Compress the given backupFile and returns the full filename with the extension -func (c *Compressor) Compress(ctx context.Context, backupFilePath string, outputWriter io.Writer) error { - files, err := archives.FilesFromDisk(ctx, &archives.FromDiskOptions{}, map[string]string{ - constants.BackupDir: backupFilePath + c.Extension(), - }) - +func (c *Compressor) Compress(ctx context.Context, outputWriter io.Writer, files []archives.FileInfo) error { + err := c.compressor.Archive(ctx, outputWriter, files) if err != nil { - return fmt.Errorf("error while reading file in compressor: %w", err) - } - err = c.compressor.Archive(ctx, outputWriter, files) - if err != nil { - return fmt.Errorf("error while compressing file in compressor: %v", err) + return fmt.Errorf("error while compressing file in compressor: %w", err) } return nil } // Decompress the given backupFile -func (c *Compressor) Decompress(ctx context.Context, inputReader io.Reader) error { +func (c *Compressor) Decompress(ctx context.Context, inputReader io.Reader, restoreDir string) error { err := c.compressor.Extract(ctx, inputReader, func(ctx context.Context, f archives.FileInfo) error { - // do something with the file here; or, if you only want a specific file or directory, - // just return until you come across the desired f.NameInArchive value(s) + // open archive file + file, err := f.Open() + if err != nil { + return err + } + defer file.Close() + // create file in restore directory + outputFile, err := c.fs.Create(restoreDir + "/" + f.Name()) + if err != nil { + return err + } + defer outputFile.Close() + // copy file content + _, err = io.Copy(outputFile, file) + if err != nil { + return err + } return nil }) return err } +func (c *Compressor) BuildFilesForCompression(inputPath string, name string) ([]archives.FileInfo, error) { + files := []archives.FileInfo{} + stat, err := c.fs.Stat(inputPath) + if err != nil { + return nil, err + } + + files = append(files, archives.FileInfo{ + FileInfo: stat, + Header: c.extension, + NameInArchive: name, + Open: func() (fs.File, error) { + return c.fs.Open(inputPath) + }, + }) + + return files, nil +} + func (c *Compressor) Extension() string { return c.extension } diff --git a/cmd/internal/encryption/encryption.go b/cmd/internal/encryption/encryption.go index 3d3ff82..6b836ba 100644 --- a/cmd/internal/encryption/encryption.go +++ b/cmd/internal/encryption/encryption.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "log/slog" - "os" "path/filepath" "strings" "unicode" @@ -116,10 +115,6 @@ func (e *Encrypter) createCipher() (cipher.Block, error) { return aes.NewCipher(key) } -func (e *Encrypter) openOutputFile(output string) (afero.File, error) { - return e.fs.OpenFile(output, os.O_RDWR|os.O_CREATE, 0644) -} - // generateIV() returns unique initalization vector of same size as cipher block for encryption func (e *Encrypter) generateIV(block cipher.Block) ([]byte, error) { iv := make([]byte, block.BlockSize()) diff --git a/cmd/internal/initializer/initializer.go b/cmd/internal/initializer/initializer.go index 59c72e8..0851e3c 100644 --- a/cmd/internal/initializer/initializer.go +++ b/cmd/internal/initializer/initializer.go @@ -265,8 +265,13 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers decryptErr <- nil } } else { - io.Copy(writer2, downloadBuffer) i.log.Info("restoring unencrypted backup with configured encryption - skipping decryption...") + _, err := io.Copy(writer2, downloadBuffer) + if err != nil { + i.metrics.CountError("streaming") + i.log.Error("error streaming downloaded data", "error", err) + decryptErr <- err + } decryptErr <- nil } }() @@ -284,7 +289,7 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers } i.currentStatus.Message = "uncompressing backup" - err = i.comp.Decompress(ctx, decryptBuffer) + err = i.comp.Decompress(ctx, decryptBuffer, constants.RestoreDir) if err != nil { return fmt.Errorf("unable to uncompress backup: %w", err) } diff --git a/cmd/main.go b/cmd/main.go index c1b8a89..eed9782 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -103,12 +103,13 @@ const ( ) var ( - cfgFile string - logger *slog.Logger - db database.Database - bp providers.BackupProvider - encrypter *encryption.Encrypter - stop context.Context + cfgFile string + logger *slog.Logger + db database.Database + bp providers.BackupProvider + encrypter *encryption.Encrypter + compressor *compress.Compressor + stop context.Context ) var rootCmd = &cobra.Command{ @@ -135,6 +136,9 @@ var startCmd = &cobra.Command{ if err := initEncrypter(); err != nil { return err } + if err := initCompressor(); err != nil { + return err + } return initBackupProvider() }, RunE: func(cmd *cobra.Command, args []string) error { @@ -153,12 +157,6 @@ var startCmd = &cobra.Command{ logger.Info("starting backup-restore-sidecar", "version", v.V, "bind-addr", addr) - comp, err := compress.New(viper.GetString(compressionMethod)) - logger.Info(comp.Extension()) - if err != nil { - return err - } - metrics := metrics.New() metrics.Start(logger.WithGroup("metrics")) @@ -168,11 +166,11 @@ var startCmd = &cobra.Command{ DatabaseProber: db, BackupProvider: bp, Metrics: metrics, - Compressor: comp, + Compressor: compressor, Encrypter: encrypter, }) - if err := initializer.New(logger.WithGroup("initializer"), addr, db, bp, comp, metrics, viper.GetString(databaseDatadirFlg), encrypter).Start(stop, backuper); err != nil { + if err := initializer.New(logger.WithGroup("initializer"), addr, db, bp, compressor, metrics, viper.GetString(databaseDatadirFlg), encrypter).Start(stop, backuper); err != nil { return err } @@ -554,6 +552,16 @@ func initDatabase() error { return nil } +func initCompressor() error { + var err error + method := viper.GetString(compressionMethod) + compressor, err = compress.New(&compress.CompressorConfig{Method: method}) + if err != nil { + return fmt.Errorf("unable to initialize compressor: %w", err) + } + return nil +} + func initEncrypter() error { var err error key := viper.GetString(encryptionKeyFlg)