Skip to content

Commit

Permalink
fix decrompression of archives
Browse files Browse the repository at this point in the history
  • Loading branch information
ostempel committed Dec 3, 2024
1 parent 148ce9b commit e68bf66
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 42 deletions.
10 changes: 9 additions & 1 deletion cmd/internal/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,19 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {
defer writer1.Close()
defer close(compressErr)

err := b.comp.Compress(ctx, backupFilePath, writer1)
files, err := b.comp.BuildFilesForCompression(constants.BackupDir, backupArchiveName)
if err != nil {
b.metrics.CountError("build_files")
compressErr <- err
return
}

err = b.comp.Compress(ctx, writer1, files)
if err != nil {
b.metrics.CountError("compress")
b.log.Error("error compressing backup", "error", err)
compressErr <- err
return
} else {
compressErr <- nil
}
Expand Down
86 changes: 66 additions & 20 deletions cmd/internal/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,112 @@ import (
"context"
"fmt"
"io"
"io/fs"

"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/mholt/archives"
"github.com/spf13/afero"
)

type Compressor struct {
fs afero.Fs
compressor *archives.CompressedArchive
extension string
}

type CompressorConfig struct {
Method string
FS afero.Fs
}

// New Returns a new Compressor
func New(method string) (*Compressor, error) {
c := &archives.CompressedArchive{}
switch method {
func New(config *CompressorConfig) (*Compressor, error) {

if config.FS == nil {
config.FS = afero.NewOsFs()
}

var c *archives.CompressedArchive
switch config.Method {
case "tar":
c = &archives.CompressedArchive{
Archival: archives.Tar{},
Archival: archives.Tar{},
Extraction: archives.Tar{},
}
case "targz":
c = &archives.CompressedArchive{
Compression: archives.Gz{},
Extraction: archives.Tar{},
Archival: archives.Tar{},
}
case "tarlz4":
c = &archives.CompressedArchive{
Compression: archives.Lz4{},
Extraction: archives.Tar{},
Archival: archives.Tar{},
}
default:
return nil, fmt.Errorf("unsupported compression method: %s", method)
return nil, fmt.Errorf("unsupported compression method: %s", config.Method)
}
return &Compressor{compressor: c, extension: "." + method}, nil
return &Compressor{
compressor: c,
extension: "." + config.Method,
fs: config.FS,
}, nil
}

// Compress the given backupFile and returns the full filename with the extension
func (c *Compressor) Compress(ctx context.Context, backupFilePath string, outputWriter io.Writer) error {
files, err := archives.FilesFromDisk(ctx, &archives.FromDiskOptions{}, map[string]string{
constants.BackupDir: backupFilePath + c.Extension(),
})

func (c *Compressor) Compress(ctx context.Context, outputWriter io.Writer, files []archives.FileInfo) error {
err := c.compressor.Archive(ctx, outputWriter, files)
if err != nil {
return fmt.Errorf("error while reading file in compressor: %w", err)
}
err = c.compressor.Archive(ctx, outputWriter, files)
if err != nil {
return fmt.Errorf("error while compressing file in compressor: %v", err)
return fmt.Errorf("error while compressing file in compressor: %w", err)
}
return nil
}

// Decompress the given backupFile
func (c *Compressor) Decompress(ctx context.Context, inputReader io.Reader) error {
func (c *Compressor) Decompress(ctx context.Context, inputReader io.Reader, restoreDir string) error {
err := c.compressor.Extract(ctx, inputReader, func(ctx context.Context, f archives.FileInfo) error {
// do something with the file here; or, if you only want a specific file or directory,
// just return until you come across the desired f.NameInArchive value(s)
// open archive file
file, err := f.Open()
if err != nil {
return err
}
defer file.Close()
// create file in restore directory
outputFile, err := c.fs.Create(restoreDir + "/" + f.Name())
if err != nil {
return err
}
defer outputFile.Close()
// copy file content
_, err = io.Copy(outputFile, file)
if err != nil {
return err
}
return nil
})
return err
}

func (c *Compressor) BuildFilesForCompression(inputPath string, name string) ([]archives.FileInfo, error) {
files := []archives.FileInfo{}
stat, err := c.fs.Stat(inputPath)
if err != nil {
return nil, err
}

files = append(files, archives.FileInfo{
FileInfo: stat,
Header: c.extension,
NameInArchive: name,
Open: func() (fs.File, error) {
return c.fs.Open(inputPath)
},
})

return files, nil
}

func (c *Compressor) Extension() string {
return c.extension
}
5 changes: 0 additions & 5 deletions cmd/internal/encryption/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"unicode"
Expand Down Expand Up @@ -116,10 +115,6 @@ func (e *Encrypter) createCipher() (cipher.Block, error) {
return aes.NewCipher(key)
}

func (e *Encrypter) openOutputFile(output string) (afero.File, error) {
return e.fs.OpenFile(output, os.O_RDWR|os.O_CREATE, 0644)
}

// generateIV() returns unique initalization vector of same size as cipher block for encryption
func (e *Encrypter) generateIV(block cipher.Block) ([]byte, error) {
iv := make([]byte, block.BlockSize())
Expand Down
9 changes: 7 additions & 2 deletions cmd/internal/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,13 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers
decryptErr <- nil
}
} else {
io.Copy(writer2, downloadBuffer)
i.log.Info("restoring unencrypted backup with configured encryption - skipping decryption...")
_, err := io.Copy(writer2, downloadBuffer)
if err != nil {
i.metrics.CountError("streaming")
i.log.Error("error streaming downloaded data", "error", err)
decryptErr <- err
}
decryptErr <- nil
}
}()
Expand All @@ -284,7 +289,7 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers
}

i.currentStatus.Message = "uncompressing backup"
err = i.comp.Decompress(ctx, decryptBuffer)
err = i.comp.Decompress(ctx, decryptBuffer, constants.RestoreDir)
if err != nil {
return fmt.Errorf("unable to uncompress backup: %w", err)
}
Expand Down
36 changes: 22 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ const (
)

var (
cfgFile string
logger *slog.Logger
db database.Database
bp providers.BackupProvider
encrypter *encryption.Encrypter
stop context.Context
cfgFile string
logger *slog.Logger
db database.Database
bp providers.BackupProvider
encrypter *encryption.Encrypter
compressor *compress.Compressor
stop context.Context
)

var rootCmd = &cobra.Command{
Expand All @@ -135,6 +136,9 @@ var startCmd = &cobra.Command{
if err := initEncrypter(); err != nil {
return err
}
if err := initCompressor(); err != nil {
return err
}
return initBackupProvider()
},
RunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -153,12 +157,6 @@ var startCmd = &cobra.Command{

logger.Info("starting backup-restore-sidecar", "version", v.V, "bind-addr", addr)

comp, err := compress.New(viper.GetString(compressionMethod))
logger.Info(comp.Extension())
if err != nil {
return err
}

metrics := metrics.New()
metrics.Start(logger.WithGroup("metrics"))

Expand All @@ -168,11 +166,11 @@ var startCmd = &cobra.Command{
DatabaseProber: db,
BackupProvider: bp,
Metrics: metrics,
Compressor: comp,
Compressor: compressor,
Encrypter: encrypter,
})

if err := initializer.New(logger.WithGroup("initializer"), addr, db, bp, comp, metrics, viper.GetString(databaseDatadirFlg), encrypter).Start(stop, backuper); err != nil {
if err := initializer.New(logger.WithGroup("initializer"), addr, db, bp, compressor, metrics, viper.GetString(databaseDatadirFlg), encrypter).Start(stop, backuper); err != nil {
return err
}

Expand Down Expand Up @@ -554,6 +552,16 @@ func initDatabase() error {
return nil
}

func initCompressor() error {
var err error
method := viper.GetString(compressionMethod)
compressor, err = compress.New(&compress.CompressorConfig{Method: method})
if err != nil {
return fmt.Errorf("unable to initialize compressor: %w", err)
}
return nil
}

func initEncrypter() error {
var err error
key := viper.GetString(encryptionKeyFlg)
Expand Down

0 comments on commit e68bf66

Please sign in to comment.