Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add streaming to upload/download of backup #102

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/internal/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func (b *Backuper) CreateBackup(ctx context.Context) error {
b.log.Info("encrypted backup")
}

err = b.bp.UploadBackup(ctx, filename)
file, err := os.Open(filename)
if err != nil {
b.metrics.CountError("open")
return fmt.Errorf("error opening backup file: %w", err)
}

err = b.bp.UploadBackup(ctx, file)
if err != nil {
b.metrics.CountError("upload")
return fmt.Errorf("error uploading backup: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/internal/backup/providers/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package providers

import (
"context"
"io"
"time"
)

Expand All @@ -10,8 +11,8 @@ type BackupProvider interface {
ListBackups(ctx context.Context) (BackupVersions, error)
CleanupBackups(ctx context.Context) error
GetNextBackupName(ctx context.Context) string
DownloadBackup(ctx context.Context, version *BackupVersion, outDir string) (string, error)
UploadBackup(ctx context.Context, sourcePath string) error
DownloadBackup(ctx context.Context, version *BackupVersion, writer io.Writer) error
UploadBackup(ctx context.Context, reader io.Reader) error
}

type BackupVersions interface {
Expand Down
75 changes: 33 additions & 42 deletions cmd/internal/backup/providers/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"io"
"log/slog"
"net/http"
"path/filepath"
"strconv"
"strings"

"errors"

"github.com/metal-stack/backup-restore-sidecar/cmd/internal/backup/providers"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/compress"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/encryption"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/spf13/afero"

Expand All @@ -24,15 +24,17 @@ import (
)

const (
defaultBackupName = "db"
ProviderConstant = "db"
)

// BackupProviderGCP implements the backup provider interface for GCP
type BackupProviderGCP struct {
fs afero.Fs
log *slog.Logger
c *storage.Client
config *BackupProviderConfigGCP
fs afero.Fs
log *slog.Logger
c *storage.Client
config *BackupProviderConfigGCP
encrypter *encryption.Encrypter
compressor *compress.Compressor
}

// BackupProviderConfigGCP provides configuration for the BackupProviderGCP
Expand All @@ -45,6 +47,8 @@ type BackupProviderConfigGCP struct {
ProjectID string
FS afero.Fs
ClientOpts []option.ClientOption
Encrypter *encryption.Encrypter
Compressor *compress.Compressor
}

func (c *BackupProviderConfigGCP) validate() error {
Expand Down Expand Up @@ -72,9 +76,6 @@ func New(ctx context.Context, log *slog.Logger, config *BackupProviderConfigGCP)
if config.ObjectsToKeep == 0 {
config.ObjectsToKeep = constants.DefaultObjectsToKeep
}
if config.BackupName == "" {
config.BackupName = defaultBackupName
}
if config.FS == nil {
config.FS = afero.NewOsFs()
}
Expand All @@ -90,10 +91,12 @@ func New(ctx context.Context, log *slog.Logger, config *BackupProviderConfigGCP)
}

return &BackupProviderGCP{
c: client,
config: config,
log: log,
fs: config.FS,
c: client,
config: config,
log: log,
fs: config.FS,
compressor: config.Compressor,
encrypter: config.Encrypter,
}, nil
}

Expand Down Expand Up @@ -147,63 +150,51 @@ func (b *BackupProviderGCP) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderGCP) DownloadBackup(ctx context.Context, version *providers.BackupVersion, writer io.Writer) error {
gen, err := strconv.ParseInt(version.Version, 10, 64)
if err != nil {
return "", err
return err
}

bucket := b.c.Bucket(b.config.BucketName)

downloadFileName := version.Name
if strings.Contains(downloadFileName, "/") {
downloadFileName = filepath.Base(downloadFileName)
}

backupFilePath := filepath.Join(outDir, downloadFileName)

b.log.Info("downloading", "object", version.Name, "gen", gen, "to", backupFilePath)
ostempel marked this conversation as resolved.
Show resolved Hide resolved
b.log.Info("downloading", "object", version.Name, "gen", gen)

r, err := bucket.Object(version.Name).Generation(gen).NewReader(ctx)
if err != nil {
return "", fmt.Errorf("backup not found: %w", err)
return fmt.Errorf("backup not found: %w", err)
}
defer r.Close()

f, err := b.fs.Create(backupFilePath)
_, err = io.Copy(writer, r)
if err != nil {
return "", err
return fmt.Errorf("error writing file from gcp to filesystem: %w", err)
}
defer f.Close()

_, err = io.Copy(f, r)
if err != nil {
return "", fmt.Errorf("error writing file from gcp to filesystem: %w", err)
}

return backupFilePath, nil
return nil
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, sourcePath string) error {
func (b *BackupProviderGCP) UploadBackup(ctx context.Context, reader io.Reader) error {
bucket := b.c.Bucket(b.config.BucketName)

r, err := b.fs.Open(sourcePath)
if err != nil {
return err
destination := ProviderConstant
if b.compressor != nil {
destination += b.compressor.Extension()
}
if b.encrypter != nil {
destination += b.encrypter.Extension()
}
defer r.Close()

destination := filepath.Base(sourcePath)
if b.config.ObjectPrefix != "" {
destination = b.config.ObjectPrefix + "/" + destination
}

b.log.Debug("uploading object", "src", sourcePath, "dest", destination)
b.log.Debug("uploading object", "dest", destination)

obj := bucket.Object(destination)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, r); err != nil {
if _, err := io.Copy(w, reader); err != nil {
return err
}
defer w.Close()
Expand Down
21 changes: 16 additions & 5 deletions cmd/internal/backup/providers/gcp/gcp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/docker/docker/api/types/container"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/compress"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,7 +54,7 @@ func Test_BackupProviderGCP(t *testing.T) {
var (
endpoint = conn.Endpoint + "/storage/v1/"
backupAmount = 5
expectedBackupName = defaultBackupName + ".tar.gz"
expectedBackupName = "db.tar.gz"
prefix = fmt.Sprintf("test-with-%d", backupAmount)

fs = afero.NewMemMapFs()
Expand All @@ -64,14 +65,19 @@ func Test_BackupProviderGCP(t *testing.T) {
httpClient = &http.Client{Transport: transCfg}
)

compressor, err := compress.New("targz")
require.NoError(t, err)

p, err := New(ctx, log, &BackupProviderConfigGCP{
BucketName: "test",
BucketLocation: "europe-west3",
ObjectPrefix: prefix,
ProjectID: "test-project-id",
FS: fs,
ClientOpts: []option.ClientOption{option.WithEndpoint(endpoint), option.WithHTTPClient(httpClient)},
Compressor: compressor,
})

require.NoError(t, err)
require.NotNil(t, p)

Expand All @@ -95,7 +101,9 @@ func Test_BackupProviderGCP(t *testing.T) {
err = afero.WriteFile(fs, backupPath, []byte(backupContent), 0600)
require.NoError(t, err)

err = p.UploadBackup(ctx, backupPath)
backupFile, err := fs.Open(backupPath)
require.NoError(t, err)
err = p.UploadBackup(ctx, backupFile)
require.NoError(t, err)

// cleaning up after test
Expand Down Expand Up @@ -153,17 +161,20 @@ func Test_BackupProviderGCP(t *testing.T) {
latestVersion := versions.Latest()
require.NotNil(t, latestVersion)

backupFilePath, err := p.DownloadBackup(ctx, latestVersion, "")
outputFile, err := fs.Create("outputfile")
require.NoError(t, err)

err = p.DownloadBackup(ctx, latestVersion, outputFile)
require.NoError(t, err)

gotContent, err := afero.ReadFile(fs, backupFilePath)
gotContent, err := afero.ReadFile(fs, outputFile.Name())
require.NoError(t, err)

backupContent := fmt.Sprintf("precious data %d", backupAmount-1)
require.Equal(t, backupContent, string(gotContent))

// cleaning up after test
err = fs.Remove(backupFilePath)
err = fs.Remove(outputFile.Name())
require.NoError(t, err)
})

Expand Down
53 changes: 38 additions & 15 deletions cmd/internal/backup/providers/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
Expand All @@ -11,13 +12,15 @@ import (
"errors"

"github.com/metal-stack/backup-restore-sidecar/cmd/internal/backup/providers"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/utils"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/compress"
"github.com/metal-stack/backup-restore-sidecar/cmd/internal/encryption"
"github.com/metal-stack/backup-restore-sidecar/pkg/constants"
"github.com/spf13/afero"
)

const (
defaultLocalBackupPath = constants.SidecarBaseDir + "/local-provider"
ProviderConstant = "db"
)

// BackupProviderLocal implements the backup provider interface for no backup provider (useful to disable sidecar functionality in development environments)
Expand All @@ -26,13 +29,17 @@ type BackupProviderLocal struct {
log *slog.Logger
config *BackupProviderConfigLocal
nextBackupCount int64
encrypter *encryption.Encrypter
compressor *compress.Compressor
}

// BackupProviderConfigLocal provides configuration for the BackupProviderLocal
type BackupProviderConfigLocal struct {
LocalBackupPath string
ObjectsToKeep int64
FS afero.Fs
Encrypter *encryption.Encrypter
Compressor *compress.Compressor
}

func (c *BackupProviderConfigLocal) validate() error {
Expand Down Expand Up @@ -61,9 +68,11 @@ func New(log *slog.Logger, config *BackupProviderConfigLocal) (*BackupProviderLo
}

return &BackupProviderLocal{
config: config,
log: log,
fs: config.FS,
config: config,
log: log,
fs: config.FS,
encrypter: config.Encrypter,
compressor: config.Compressor,
}, nil
}

Expand All @@ -86,28 +95,44 @@ func (b *BackupProviderLocal) CleanupBackups(_ context.Context) error {
}

// DownloadBackup downloads the given backup version to the specified folder
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, outDir string) (string, error) {
func (b *BackupProviderLocal) DownloadBackup(_ context.Context, version *providers.BackupVersion, writer io.Writer) error {
b.log.Info("download backup called for provider local")

source := filepath.Join(b.config.LocalBackupPath, version.Name)

backupFilePath := filepath.Join(outDir, version.Name)
infile, err := b.fs.Open(source)
if err != nil {
return fmt.Errorf("could not open file %s: %w", source, err)
}
defer infile.Close()

err := utils.Copy(b.fs, source, backupFilePath)
_, err = io.Copy(writer, infile)
if err != nil {
return "", err
return err
}

return backupFilePath, err
return err
}

// UploadBackup uploads a backup to the backup provider
func (b *BackupProviderLocal) UploadBackup(_ context.Context, sourcePath string) error {
// UploadBackup uploads a backup to the backup provider by providing a reader to the backup archive
func (b *BackupProviderLocal) UploadBackup(ctx context.Context, reader io.Reader) error {
b.log.Info("upload backups called for provider local")

destination := filepath.Join(b.config.LocalBackupPath, filepath.Base(sourcePath))
destination := b.config.LocalBackupPath + "/" + b.GetNextBackupName(ctx)
b.nextBackupCount++
b.nextBackupCount = b.nextBackupCount % b.config.ObjectsToKeep
if b.compressor != nil {
destination = destination + b.compressor.Extension()
}
if b.encrypter != nil {
destination = destination + b.encrypter.Extension()
}
output, err := b.fs.Create(destination)
if err != nil {
return fmt.Errorf("could not create file %s: %w", destination, err)
}

err := utils.Copy(b.fs, sourcePath, destination)
_, err = io.Copy(output, reader)
if err != nil {
return err
}
Expand All @@ -118,8 +143,6 @@ func (b *BackupProviderLocal) UploadBackup(_ context.Context, sourcePath string)
// GetNextBackupName returns a name for the next backup archive that is going to be uploaded
func (b *BackupProviderLocal) GetNextBackupName(_ context.Context) string {
name := strconv.FormatInt(b.nextBackupCount, 10)
b.nextBackupCount++
b.nextBackupCount = b.nextBackupCount % b.config.ObjectsToKeep
return name
}

Expand Down
Loading
Loading