Skip to content

Commit

Permalink
Revert DownloadFile()
Browse files Browse the repository at this point in the history
  • Loading branch information
maurycy authored Oct 27, 2020
1 parent 0c6f0ac commit 3c0cb8f
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 8 deletions.
5 changes: 5 additions & 0 deletions core/textile/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type BucketInterface interface {
path string,
reader io.Reader,
) (result path.Resolved, root path.Path, err error)
DownloadFile(
ctx context.Context,
path string,
reader io.Reader,
) (result path.Resolved, root path.Path, err error)
GetFile(
ctx context.Context,
path string,
Expand Down
19 changes: 19 additions & 0 deletions core/textile/bucket/bucket_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ func (b *Bucket) UploadFile(ctx context.Context, path string, reader io.Reader)
return result, root, nil
}

func (b *Bucket) DownloadFile(ctx context.Context, path string, reader io.Reader) (result path.Resolved, root path.Path, err error) {
b.lock.Lock()
defer b.lock.Unlock()

ctx, _, err = b.GetContext(ctx)
if err != nil {
return nil, nil, err
}

result, root, err = b.bucketsClient.PushPath(ctx, b.Key(), path, reader)
if err != nil {
return nil, nil, err
}

// no notification

return result, root, nil
}

// GetFile pulls path from bucket writing it to writer if it's a file.
func (b *Bucket) GetFile(ctx context.Context, path string, w io.Writer) error {
b.lock.RLock()
Expand Down
37 changes: 37 additions & 0 deletions core/textile/sync/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,43 @@ func (s *synchronizer) uploadFileToBucket(ctx context.Context, sourceBucket, tar
return nil
}

func (s *synchronizer) downloadFile(ctx context.Context, sourceBucket, targetBucket bucket.BucketInterface, path string) error {

pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

errc := make(chan error, 1)
// go routine for piping
go func() {
defer close(errc)
defer pipeWriter.Close()

if err := sourceBucket.GetFile(ctx, path, pipeWriter); err != nil {
errc <- err
return
}

errc <- nil
}()

if _, _, err := targetBucket.DownloadFile(ctx, path, pipeReader); err != nil {
return err
}

if err := <-errc; err != nil {
return err
}

if err := s.addCurrentUserAsFileOwner(ctx, targetBucket.Slug(), path); err != nil {
// not returning since we dont want to halt the whole process
// also acl will still work since they are the owner
// of the thread so this is more for showing members view
log.Error("Unable to push path access roles for owner", err)
}

return nil
}

// backup all files in a bucket
func (s *synchronizer) uploadAllFilesInPath(ctx context.Context, bucket, path string) error {
localBucket, err := s.getBucket(ctx, bucket)
Expand Down
28 changes: 21 additions & 7 deletions core/textile/sync/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ import (
"github.com/FleekHQ/space-daemon/log"
)

// return the targetBucket if path is newer there, srcBucket otherwise
func (s *synchronizer) newerBucketPath(ctx context.Context, srcBucket, targetBucket bucket.BucketInterface, path string) (bucket.BucketInterface, error) {
srcUpdatedAt, err := srcBucket.UpdatedAt(ctx, path)
if err != nil {
return nil, err
}

targetUpdatedAt, err := targetBucket.UpdatedAt(ctx, path)
if err != nil {
return nil, err
}

if srcUpdatedAt >= targetUpdatedAt {
return srcBucket, nil
}

return targetBucket, nil
}

// restore bucket by downloading files to the local from the mirror bucket
func (s *synchronizer) restoreBucket(ctx context.Context, bucketSlug string) error {

Expand All @@ -31,17 +50,12 @@ func (s *synchronizer) restoreBucket(ctx context.Context, bucketSlug string) err
}

if exists {
localUpdatedAt, err := localBucket.UpdatedAt(c, itemPath)
if err != nil {
return err
}

mirrorUpdatedAt, err := mirrorBucket.UpdatedAt(c, itemPath)
newerBucket, err := s.newerBucketPath(c, localBucket, mirrorBucket, itemPath)
if err != nil {
return err
}

if localUpdatedAt >= mirrorUpdatedAt {
if newerBucket == localBucket {
// do not overwrite: mirror is not newer
return nil
}
Expand Down
16 changes: 15 additions & 1 deletion core/textile/sync/task-executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"

"github.com/FleekHQ/space-daemon/core/events"
"github.com/FleekHQ/space-daemon/core/textile/utils"
"github.com/FleekHQ/space-daemon/log"
)

func checkTaskType(t *Task, tp taskType) error {
Expand Down Expand Up @@ -205,6 +207,8 @@ func (s *synchronizer) processBucketRestoreTask(ctx context.Context, task *Task)
}

func (s *synchronizer) processRestoreFile(ctx context.Context, task *Task) error {
log.Debug(fmt.Sprintf("processRestoreFile: 1"))

if err := checkTaskType(task, restoreFileTask); err != nil {
return err
}
Expand All @@ -222,9 +226,19 @@ func (s *synchronizer) processRestoreFile(ctx context.Context, task *Task) error
return err
}

newerBucket, err := s.newerBucketPath(ctx, localBucket, mirrorBucket, path)
if err != nil {
return err
}

if newerBucket == localBucket {
// do not overwrite: mirror is not newer
return nil
}

// TODO: use timestamp or CID for check

if err = s.uploadFileToBucket(ctx, mirrorBucket, localBucket, path); err != nil {
if err = s.downloadFile(ctx, mirrorBucket, localBucket, path); err != nil {
return err
}

Expand Down
32 changes: 32 additions & 0 deletions mocks/Bucket.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3c0cb8f

Please sign in to comment.