Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add streaming to upload/download of backup #102

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/internal/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {
b.log.Info("encrypted backup")
}

err = b.bp.UploadBackup(ctx, filename)
file, err := os.Open(filename)
if err != nil {
b.metrics.CountError("open")
return fmt.Errorf("error opening backup file: %w", err)
}

err = b.bp.UploadBackup(ctx, file, filename)
if err != nil {
b.metrics.CountError("upload")
return fmt.Errorf("error uploading backup: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/internal/backup/providers/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package providers

import (
"context"
"io"
"time"
)

Expand All @@ -10,8 +11,8 @@ type BackupProvider interface {
ListBackups(ctx context.Context) (BackupVersions, error)
CleanupBackups(ctx context.Context) error
GetNextBackupName(ctx context.Context) string
DownloadBackup(ctx context.Context, version *BackupVersion, outDir string) (string, error)
UploadBackup(ctx context.Context, sourcePath string) error
DownloadBackup(ctx context.Context, version *BackupVersion, writer io.Writer) error
UploadBackup(ctx context.Context, reader io.Reader, sourcePath string) error
ostempel marked this conversation as resolved.
Show resolved Hide resolved
}

type BackupVersions interface {
Expand Down
32 changes: 8 additions & 24 deletions cmd/internal/backup/providers/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ func (b *BackupProviderGCP) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, writer io.Writer) error {
gen, err := strconv.ParseInt(version.Version, 10, 64)
if err != nil {
return "", err
return err
}

bucket := b.c.Bucket(b.config.BucketName)
Expand All @@ -160,40 +160,24 @@ func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *provide
downloadFileName = filepath.Base(downloadFileName)
}

backupFilePath := filepath.Join(outDir, downloadFileName)

b.log.Info("downloading", "object", version.Name, "gen", gen, "to", backupFilePath)
ostempel marked this conversation as resolved.
Show resolved Hide resolved

r, err := bucket.Object(version.Name).Generation(gen).NewReader(ctx)
if err != nil {
return "", fmt.Errorf("backup not found: %w", err)
return fmt.Errorf("backup not found: %w", err)
}
defer r.Close()

f, err := b.fs.Create(backupFilePath)
if err != nil {
return "", err
}
defer f.Close()

_, err = io.Copy(f, r)
_, err = io.Copy(writer, r)
if err != nil {
return "", fmt.Errorf("error writing file from gcp to filesystem: %w", err)
return fmt.Errorf("error writing file from gcp to filesystem: %w", err)
}

return backupFilePath, nil
return nil
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string) error {
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, reader io.Reader, sourcePath string) error {
bucket := b.c.Bucket(b.config.BucketName)

r, err := b.fs.Open(sourcePath)
if err != nil {
return err
}
defer r.Close()

destination := filepath.Base(sourcePath)
if b.config.ObjectPrefix != "" {
destination = b.config.ObjectPrefix + "/" + destination
Expand All @@ -203,7 +187,7 @@ func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string)

obj := bucket.Object(destination)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, r); err != nil {
if _, err := io.Copy(w, reader); err != nil {
return err
}
defer w.Close()
Expand Down
26 changes: 17 additions & 9 deletions cmd/internal/backup/providers/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
Expand All @@ -11,7 +12,6 @@ import (
"errors"

"github.com/metal-stack/backup-restore-sidecar/cmd/internal/backup/providers"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/utils"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/spf13/afero"
)
Expand Down Expand Up @@ -86,28 +86,36 @@ func (b *BackupProviderLocal) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, writer io.Writer) error {
b.log.Info("download backup called for provider local")

source := filepath.Join(b.config.LocalBackupPath, version.Name)

backupFilePath := filepath.Join(outDir, version.Name)
infile, err := b.fs.Open(source)
if err != nil {
return fmt.Errorf("could not open file %s: %w", source, err)
}
defer infile.Close()

err := utils.Copy(b.fs, source, backupFilePath)
_, err = io.Copy(writer, infile)
if err != nil {
return "", err
return err
}

return backupFilePath, err
return err
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderLocal) UploadBackup(_ context.Context, sourcePath string) error {
// UploadBackup uploads a backup to the backup provider by providing a reader to the backup archive
func (b *BackupProviderLocal) UploadBackup(_ context.Context, reader io.Reader, sourcePath string) error {
b.log.Info("upload backups called for provider local")

destination := filepath.Join(b.config.LocalBackupPath, filepath.Base(sourcePath))
output, err := b.fs.Create(destination)
if err != nil {
return fmt.Errorf("could not create file %s: %w", destination, err)
}

err := utils.Copy(b.fs, sourcePath, destination)
_, err = io.Copy(output, reader)
if err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions cmd/internal/backup/providers/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ func Test_BackupProviderLocal(t *testing.T) {
err = afero.WriteFile(fs, backupPath, []byte(backupContent), 0600)
require.NoError(t, err)

err = p.UploadBackup(ctx, backupPath)
infile, err := fs.Open(backupPath)
require.NoError(t, err)

err = p.UploadBackup(ctx, infile, backupPath)
require.NoError(t, err)

localPath := path.Join(localProviderBackupPath, backupName)
_, err := fs.Stat(localPath)
_, err = fs.Stat(localPath)
require.NoError(t, err)

backupFiles, err := afero.ReadDir(fs, localProviderBackupPath)
Expand Down Expand Up @@ -130,16 +133,19 @@ func Test_BackupProviderLocal(t *testing.T) {
latestVersion := versions.Latest()
require.NotNil(t, latestVersion)

backupFilePath, err := p.DownloadBackup(ctx, latestVersion, "")
outputFile, err := fs.Create("output.tar.gz")
require.NoError(t, err)

err = p.DownloadBackup(ctx, latestVersion, outputFile)
require.NoError(t, err)

gotContent, err := afero.ReadFile(fs, backupFilePath)
gotContent, err := afero.ReadFile(fs, outputFile.Name())
require.NoError(t, err)

require.Equal(t, fmt.Sprintf("precious data %d", backupAmount), string(gotContent))

// cleaning up after test
err = fs.Remove(backupFilePath)
err = fs.Remove(outputFile.Name())
require.NoError(t, err)
})

Expand Down
43 changes: 15 additions & 28 deletions cmd/internal/backup/providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package s3

import (
"context"
"io"
"log/slog"
"path/filepath"
"strings"

"errors"

Expand Down Expand Up @@ -189,49 +189,36 @@ func (b *BackupProviderS3) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderS3) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderS3) DownloadBackup(ctx context.Context, version *providers.BackupVersion, writer io.Writer) error {
bucket := aws.String(b.config.BucketName)

downloadFileName := version.Name
if strings.Contains(downloadFileName, "/") {
downloadFileName = filepath.Base(downloadFileName)
}

backupFilePath := filepath.Join(outDir, downloadFileName)

f, err := b.fs.Create(backupFilePath)
if err != nil {
return "", err
}
defer f.Close()

downloader := s3manager.NewDownloader(b.sess)

_, err = downloader.DownloadWithContext(
buf := aws.NewWriteAtBuffer([]byte{})
ostempel marked this conversation as resolved.
Show resolved Hide resolved
_, err := downloader.DownloadWithContext(
ctx,
f,
buf,
&s3.GetObjectInput{
Bucket: bucket,
Key: &version.Name,
VersionId: &version.Version,
})
if err != nil {
return "", err
return err
}

_, err = writer.Write(buf.Bytes())
if err != nil {
return err
}

return backupFilePath, nil
return nil
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderS3) UploadBackup(ctx context.Context, sourcePath string) error {
func (b *BackupProviderS3) UploadBackup(ctx context.Context, reader io.Reader, sourcePath string) error {
bucket := aws.String(b.config.BucketName)

r, err := b.fs.Open(sourcePath)
if err != nil {
return err
}
defer r.Close()

destination := filepath.Base(sourcePath)
if b.config.ObjectPrefix != "" {
destination = b.config.ObjectPrefix + "/" + destination
Expand All @@ -240,10 +227,10 @@ func (b *BackupProviderS3) UploadBackup(ctx context.Context, sourcePath string)
b.log.Debug("uploading object", "src", sourcePath, "dest", destination)

uploader := s3manager.NewUploader(b.sess)
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: bucket,
Key: aws.String(destination),
Body: r,
Body: reader,
})
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion cmd/internal/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,12 @@ func (i *Initializer) Restore(ctx context.Context, version *providers.BackupVers
return fmt.Errorf("could not delete priorly downloaded file: %w", err)
}

backupFilePath, err := i.bp.DownloadBackup(ctx, version, constants.DownloadDir)
outputFile, err := os.Open(backupFilePath)
if err != nil {
return fmt.Errorf("could not open file for writing: %w", err)
}

err = i.bp.DownloadBackup(ctx, version, outputFile)
if err != nil {
return fmt.Errorf("unable to download backup: %w", err)
}
Expand Down
13 changes: 10 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"

v1 "github.com/metal-stack/backup-restore-sidecar/api/v1"
Expand Down Expand Up @@ -300,15 +301,21 @@ var downloadBackupCmd = &cobra.Command{

output := viper.GetString(downloadOutputFlg)

destination, err := bp.DownloadBackup(context.Background(), &providers.BackupVersion{Name: backup.GetBackup().GetName()}, output)
outputPath := filepath.Join(output, backup.GetBackup().GetName())
outputFile, err := os.Open(outputPath)
if err != nil {
return fmt.Errorf("failed opening output file: %w", err)
}

err = bp.DownloadBackup(context.Background(), &providers.BackupVersion{Name: backup.GetBackup().GetName()}, outputFile)

if err != nil {
return fmt.Errorf("failed downloading backup: %w", err)
}

if encrypter != nil {
if encryption.IsEncrypted(destination) {
_, err = encrypter.Decrypt(destination)
if encryption.IsEncrypted(outputPath) {
_, err = encrypter.Decrypt(outputPath)
if err != nil {
return fmt.Errorf("unable to decrypt backup: %w", err)
}
Expand Down
Loading