Skip to content

Commit

Permalink
add streaming to upload/download of backup
Browse files Browse the repository at this point in the history
  • Loading branch information
ostempel committed Dec 10, 2024
1 parent c859648 commit 4ce2dce
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 73 deletions.
8 changes: 7 additions & 1 deletion cmd/internal/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {
b.log.Info("encrypted backup")
}

err = b.bp.UploadBackup(ctx, filename)
file, err := os.Open(filename)
if err != nil {
b.metrics.CountError("open")
return fmt.Errorf("error opening backup file: %w", err)
}

err = b.bp.UploadBackup(ctx, file, filename)
if err != nil {
b.metrics.CountError("upload")
return fmt.Errorf("error uploading backup: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/internal/backup/providers/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package providers

import (
"context"
"io"
"time"
)

Expand All @@ -10,8 +11,8 @@ type BackupProvider interface {
ListBackups(ctx context.Context) (BackupVersions, error)
CleanupBackups(ctx context.Context) error
GetNextBackupName(ctx context.Context) string
DownloadBackup(ctx context.Context, version *BackupVersion, outDir string) (string, error)
UploadBackup(ctx context.Context, sourcePath string) error
DownloadBackup(ctx context.Context, version *BackupVersion, writer io.Writer) error
UploadBackup(ctx context.Context, reader io.Reader, sourcePath string) error
}

type BackupVersions interface {
Expand Down
32 changes: 8 additions & 24 deletions cmd/internal/backup/providers/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ func (b *BackupProviderGCP) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, writer io.Writer) error {
gen, err := strconv.ParseInt(version.Version, 10, 64)
if err != nil {
return "", err
return err
}

bucket := b.c.Bucket(b.config.BucketName)
Expand All @@ -160,40 +160,24 @@ func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *provide
downloadFileName = filepath.Base(downloadFileName)
}

backupFilePath := filepath.Join(outDir, downloadFileName)

b.log.Info("downloading", "object", version.Name, "gen", gen, "to", backupFilePath)

r, err := bucket.Object(version.Name).Generation(gen).NewReader(ctx)
if err != nil {
return "", fmt.Errorf("backup not found: %w", err)
return fmt.Errorf("backup not found: %w", err)
}
defer r.Close()

f, err := b.fs.Create(backupFilePath)
if err != nil {
return "", err
}
defer f.Close()

_, err = io.Copy(f, r)
_, err = io.Copy(writer, r)
if err != nil {
return "", fmt.Errorf("error writing file from gcp to filesystem: %w", err)
return fmt.Errorf("error writing file from gcp to filesystem: %w", err)
}

return backupFilePath, nil
return nil
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string) error {
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, reader io.Reader, sourcePath string) error {
bucket := b.c.Bucket(b.config.BucketName)

r, err := b.fs.Open(sourcePath)
if err != nil {
return err
}
defer r.Close()

destination := filepath.Base(sourcePath)
if b.config.ObjectPrefix != "" {
destination = b.config.ObjectPrefix + "/" + destination
Expand All @@ -203,7 +187,7 @@ func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string)

obj := bucket.Object(destination)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, r); err != nil {
if _, err := io.Copy(w, reader); err != nil {
return err
}
defer w.Close()
Expand Down
26 changes: 17 additions & 9 deletions cmd/internal/backup/providers/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
Expand All @@ -11,7 +12,6 @@ import (
"errors"

"github.com/metal-stack/backup-restore-sidecar/cmd/internal/backup/providers"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/utils"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/spf13/afero"
)
Expand Down Expand Up @@ -86,28 +86,36 @@ func (b *BackupProviderLocal) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, writer io.Writer) error {
b.log.Info("download backup called for provider local")

source := filepath.Join(b.config.LocalBackupPath, version.Name)

backupFilePath := filepath.Join(outDir, version.Name)
infile, err := b.fs.Open(source)
if err != nil {
return fmt.Errorf("could not open file %s: %w", source, err)
}
defer infile.Close()

err := utils.Copy(b.fs, source, backupFilePath)
_, err = io.Copy(writer, infile)
if err != nil {
return "", err
return err
}

return backupFilePath, err
return err
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderLocal) UploadBackup(_ context.Context, sourcePath string) error {
// UploadBackup uploads a backup to the backup provider by providing a reader to the backup archive
func (b *BackupProviderLocal) UploadBackup(_ context.Context, reader io.Reader, sourcePath string) error {
b.log.Info("upload backups called for provider local")

destination := filepath.Join(b.config.LocalBackupPath, filepath.Base(sourcePath))
output, err := b.fs.Create(destination)
if err != nil {
return fmt.Errorf("could not create file %s: %w", destination, err)
}

err := utils.Copy(b.fs, sourcePath, destination)
_, err = io.Copy(output, reader)
if err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions cmd/internal/backup/providers/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ func Test_BackupProviderLocal(t *testing.T) {
err = afero.WriteFile(fs, backupPath, []byte(backupContent), 0600)
require.NoError(t, err)

err = p.UploadBackup(ctx, backupPath)
infile, err := fs.Open(backupPath)
require.NoError(t, err)

err = p.UploadBackup(ctx, infile, backupPath)
require.NoError(t, err)

localPath := path.Join(localProviderBackupPath, backupName)
_, err := fs.Stat(localPath)
_, err = fs.Stat(localPath)
require.NoError(t, err)

backupFiles, err := afero.ReadDir(fs, localProviderBackupPath)
Expand Down Expand Up @@ -130,16 +133,19 @@ func Test_BackupProviderLocal(t *testing.T) {
latestVersion := versions.Latest()
require.NotNil(t, latestVersion)

backupFilePath, err := p.DownloadBackup(ctx, latestVersion, "")
outputFile, err := fs.Create("output.tar.gz")
require.NoError(t, err)

err = p.DownloadBackup(ctx, latestVersion, outputFile)
require.NoError(t, err)

gotContent, err := afero.ReadFile(fs, backupFilePath)
gotContent, err := afero.ReadFile(fs, outputFile.Name())
require.NoError(t, err)

require.Equal(t, fmt.Sprintf("precious data %d", backupAmount), string(gotContent))

// cleaning up after test
err = fs.Remove(backupFilePath)
err = fs.Remove(outputFile.Name())
require.NoError(t, err)
})

Expand Down
43 changes: 15 additions & 28 deletions cmd/internal/backup/providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package s3

import (
"context"
"io"
"log/slog"
"path/filepath"
"strings"

"errors"

Expand Down Expand Up @@ -189,49 +189,36 @@ func (b *BackupProviderS3) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderS3) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderS3) DownloadBackup(ctx context.Context, version *providers.BackupVersion, writer io.Writer) error {
bucket := aws.String(b.config.BucketName)

downloadFileName := version.Name
if strings.Contains(downloadFileName, "/") {
downloadFileName = filepath.Base(downloadFileName)
}

backupFilePath := filepath.Join(outDir, downloadFileName)

f, err := b.fs.Create(backupFilePath)
if err != nil {
return "", err
}
defer f.Close()

downloader := s3manager.NewDownloader(b.sess)

_, err = downloader.DownloadWithContext(
buf := aws.NewWriteAtBuffer([]byte{})
_, err := downloader.DownloadWithContext(
ctx,
f,
buf,
&s3.GetObjectInput{
Bucket: bucket,
Key: &version.Name,
VersionId: &version.Version,
})
if err != nil {
return "", err
return err
}

_, err = writer.Write(buf.Bytes())
if err != nil {
return err
}

return backupFilePath, nil
return nil
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderS3) UploadBackup(ctx context.Context, sourcePath string) error {
func (b *BackupProviderS3) UploadBackup(ctx context.Context, reader io.Reader, sourcePath string) error {
bucket := aws.String(b.config.BucketName)

r, err := b.fs.Open(sourcePath)
if err != nil {
return err
}
defer r.Close()

destination := filepath.Base(sourcePath)
if b.config.ObjectPrefix != "" {
destination = b.config.ObjectPrefix + "/" + destination
Expand All @@ -240,10 +227,10 @@ func (b *BackupProviderS3) UploadBackup(ctx context.Context, sourcePath string)
b.log.Debug("uploading object", "src", sourcePath, "dest", destination)

uploader := s3manager.NewUploader(b.sess)
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: bucket,
Key: aws.String(destination),
Body: r,
Body: reader,
})
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion cmd/internal/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,12 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers
return fmt.Errorf("could not delete priorly downloaded file: %w", err)
}

backupFilePath, err := i.bp.DownloadBackup(ctx, version, constants.DownloadDir)
outputFile, err := os.Open(backupFilePath)
if err != nil {
return fmt.Errorf("could not open file for writing: %w", err)
}

err = i.bp.DownloadBackup(ctx, version, outputFile)
if err != nil {
return fmt.Errorf("unable to download backup: %w", err)
}
Expand Down
13 changes: 10 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"

v1 "github.com/metal-stack/backup-restore-sidecar/api/v1"
Expand Down Expand Up @@ -300,15 +301,21 @@ var downloadBackupCmd = &cobra.Command{

output := viper.GetString(downloadOutputFlg)

destination, err := bp.DownloadBackup(context.Background(), &providers.BackupVersion{Name: backup.GetBackup().GetName()}, output)
outputPath := filepath.Join(output, backup.GetBackup().GetName())
outputFile, err := os.Open(outputPath)
if err != nil {
return fmt.Errorf("failed opening output file: %w", err)
}

err = bp.DownloadBackup(context.Background(), &providers.BackupVersion{Name: backup.GetBackup().GetName()}, outputFile)

if err != nil {
return fmt.Errorf("failed downloading backup: %w", err)
}

if encrypter != nil {
if encryption.IsEncrypted(destination) {
_, err = encrypter.Decrypt(destination)
if encryption.IsEncrypted(outputPath) {
_, err = encrypter.Decrypt(outputPath)
if err != nil {
return fmt.Errorf("unable to decrypt backup: %w", err)
}
Expand Down

0 comments on commit 4ce2dce

Please sign in to comment.