Skip to content

Commit

Permalink
Add backup progress status and notification (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmerrill6 authored Oct 5, 2020
1 parent 81dd1e5 commit 5dd79d6
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 54 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (a *App) Start(ctx context.Context) error {
)

textileClient.AttachMailboxNotifier(srv)
textileClient.AttachSynchronizerNotifier(srv)
err = a.RunAsync("BucketSync", bucketSync, func() error {
bucketSync.RegisterNotifier(srv)
return bucketSync.Start(ctx)
Expand Down
24 changes: 14 additions & 10 deletions core/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
type FileEventType string

const (
FileAdded FileEventType = "FileAdded"
FileDeleted FileEventType = "FileDeleted"
FileUpdated FileEventType = "FileUpdated"
FileAdded FileEventType = "FileAdded"
FileDeleted FileEventType = "FileDeleted"
FileUpdated FileEventType = "FileUpdated"
FileBackupInProgress FileEventType = "FileBackupInProgress"
FileBackupReady FileEventType = "FileBackupReady"

FolderAdded FileEventType = "FolderAdded"
FolderDeleted FileEventType = "FolderDeleted"
Expand All @@ -20,16 +22,18 @@ const (
)

type FileEvent struct {
Path string
Info os.FileInfo
Type FileEventType
Path string
Bucket string
Info os.FileInfo
Type FileEventType
}

func NewFileEvent(path string, eventType FileEventType, info os.FileInfo) FileEvent {
func NewFileEvent(path, bucket string, eventType FileEventType, info os.FileInfo) FileEvent {
return FileEvent{
Path: path,
Type: eventType,
Info: info,
Path: path,
Bucket: bucket,
Type: eventType,
Info: info,
}
}

Expand Down
10 changes: 6 additions & 4 deletions core/space/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type FileInfo struct {
IpfsHash string
BackedUp bool
LocallyAvailable bool
BackupInProgress bool
}

type OpenFileInfo struct {
Expand Down Expand Up @@ -150,10 +151,11 @@ type APISessionTokens struct {
}

type MirrorFile struct {
Path string
BucketSlug string
Backup bool
Shared bool
Path string
BucketSlug string
Backup bool
Shared bool
BackupInProgress bool
}

type SharedDirEntry struct {
Expand Down
3 changes: 3 additions & 0 deletions core/space/services/services_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ func (s *Space) listDirAtPath(
}

backedup := false
backupInProgress := false
if mirror_files[item.Path] != nil {
backedup = mirror_files[item.Path].Backup
backupInProgress = mirror_files[item.Path].BackupInProgress
}

locallyAvailable := false
Expand All @@ -220,6 +222,7 @@ func (s *Space) listDirAtPath(
IpfsHash: item.Cid,
BackedUp: backedup,
LocallyAvailable: locallyAvailable,
BackupInProgress: backupInProgress,
}
entries = append(entries, entry)

Expand Down
4 changes: 4 additions & 0 deletions core/textile/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,7 @@ func (tc *textileClient) requiresHubConnection() error {
}
return nil
}

func (tc *textileClient) AttachSynchronizerNotifier(notif synchronizer.EventNotifier) {
tc.sync.AttachNotifier(notif)
}
42 changes: 25 additions & 17 deletions core/textile/model/mirror_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
)

type MirrorFileSchema struct {
ID core.InstanceID `json:"_id"`
Path string `json:"path"`
BucketSlug string `json:"bucket_slug"`
Backup bool `json:"backup"`
Shared bool `json:"shared"`

DbID string
ID core.InstanceID `json:"_id"`
Path string `json:"path"`
BucketSlug string `json:"bucket_slug"`
Backup bool `json:"backup"`
Shared bool `json:"shared"`
BackupInProgress bool `json:"backupInProgress"`
DbID string
}

type MirrorBucketSchema struct {
Expand Down Expand Up @@ -141,10 +141,11 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF
}

newInstance := &MirrorFileSchema{
Path: mirrorFile.Path,
BucketSlug: mirrorFile.BucketSlug,
Backup: mirrorFile.Backup,
Shared: mirrorFile.Shared,
Path: mirrorFile.Path,
BucketSlug: mirrorFile.BucketSlug,
Backup: mirrorFile.Backup,
BackupInProgress: mirrorFile.BackupInProgress,
Shared: mirrorFile.Shared,
}

instances := client.Instances{newInstance}
Expand All @@ -156,12 +157,13 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF

id := res[0]
return &MirrorFileSchema{
Path: newInstance.Path,
BucketSlug: newInstance.BucketSlug,
Backup: newInstance.Backup,
Shared: newInstance.Shared,
ID: core.InstanceID(id),
DbID: newInstance.DbID,
Path: newInstance.Path,
BucketSlug: newInstance.BucketSlug,
Backup: newInstance.Backup,
BackupInProgress: newInstance.BackupInProgress,
Shared: newInstance.Shared,
ID: core.InstanceID(id),
DbID: newInstance.DbID,
}, nil
}

Expand Down Expand Up @@ -209,5 +211,11 @@ func (m *model) initMirrorFileModel(ctx context.Context) (context.Context, *thre
}},
})

// Migrates db by adding new fields between old version of the daemon and a new one
m.threads.UpdateCollection(metaCtx, *dbID, db.CollectionConfig{
Name: mirrorFileModelName,
Schema: util.SchemaFromInstance(&MirrorFileSchema{}, false),
})

return metaCtx, dbID, nil
}
4 changes: 4 additions & 0 deletions core/textile/sharing.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ func (tc *textileClient) GetReceivedFiles(ctx context.Context, accepted bool, se
IpfsHash: ipfsHash,
LocallyAvailable: false,
BackedUp: true,

// TODO: Reflect correct state when we add local updates syncing to remote
BackupInProgress: false,

DirEntry: domain.DirEntry{
Path: file.Path,
IsDir: isDir,
Expand Down
14 changes: 8 additions & 6 deletions core/textile/sync/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (

const mirrorThreadKeyName = "mirrorV1"

func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string) error {
func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string, isInProgress bool) error {
mf, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug)
if err != nil {
return err
}
if mf != nil {
// update
mf.Backup = true
mf.Backup = !isInProgress
mf.BackupInProgress = isInProgress

_, err = s.model.UpdateMirrorFile(ctx, mf)
if err != nil {
Expand All @@ -33,10 +34,11 @@ func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug
} else {
// create
mf := &domain.MirrorFile{
Path: path,
BucketSlug: bucketSlug,
Backup: true,
Shared: false,
Path: path,
BucketSlug: bucketSlug,
Backup: !isInProgress,
BackupInProgress: isInProgress,
Shared: false,
}

_, err := s.model.CreateMirrorFile(ctx, mf)
Expand Down
7 changes: 7 additions & 0 deletions core/textile/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package sync

import (
"context"

"github.com/FleekHQ/space-daemon/core/events"
)

type EventNotifier interface {
SendFileEvent(event events.FileEvent)
}

type Synchronizer interface {
NotifyItemAdded(bucket, path string)
NotifyItemRemoved(bucket, path string)
Expand All @@ -14,4 +20,5 @@ type Synchronizer interface {
RestoreQueue() error
Shutdown()
String() string
AttachNotifier(EventNotifier)
}
4 changes: 2 additions & 2 deletions core/textile/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ func TestSync_ProcessTask(t *testing.T) {

// Makes the processAddItem and processPinFilefail right away
mockModel.On("FindBucket", mock.Anything, mock.Anything).Return(nil, errors.New("some error"))
mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error"))
// mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error"))

mockStore.On("Set", mock.Anything, mock.Anything).Return(nil)

s.Start(ctx)

s.Shutdown()

expectedState := "Textile sync [file pinning]: Total: 1, Queued: 1, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n"
expectedState := "Textile sync [file pinning]: Total: 0, Queued: 0, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n"

assert.Equal(t, expectedState, s.String())
mockModel.AssertExpectations(t)
Expand Down
13 changes: 5 additions & 8 deletions core/textile/sync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type synchronizer struct {
cfg config.Config
netc *nc.Client
queueWg *sync.WaitGroup
eventNotifier EventNotifier
}

// Creates a new Synchronizer
Expand Down Expand Up @@ -103,10 +104,6 @@ func (s *synchronizer) NotifyItemAdded(bucket, path string) {
t := newTask(addItemTask, []string{bucket, path})
s.enqueueTask(t, s.taskQueue)

pft := newTask(pinFileTask, []string{bucket, path})
pft.Parallelizable = true
s.enqueueTask(pft, s.filePinningQueue)

s.notifySyncNeeded()
}

Expand All @@ -115,10 +112,6 @@ func (s *synchronizer) NotifyItemRemoved(bucket, path string) {
t := newTask(removeItemTask, []string{bucket, path})
s.enqueueTask(t, s.taskQueue)

uft := newTask(unpinFileTask, []string{bucket, path})
uft.Parallelizable = true
s.enqueueTask(uft, s.filePinningQueue)

s.notifySyncNeeded()
}

Expand Down Expand Up @@ -348,3 +341,7 @@ func (s *synchronizer) sync(ctx context.Context, queue *list.List) error {

return nil
}

func (s *synchronizer) AttachNotifier(notif EventNotifier) {
s.eventNotifier = notif
}
28 changes: 25 additions & 3 deletions core/textile/sync/task-executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"

"github.com/FleekHQ/space-daemon/core/events"
"github.com/FleekHQ/space-daemon/core/textile/utils"
)

Expand Down Expand Up @@ -32,11 +33,21 @@ func (s *synchronizer) processAddItem(ctx context.Context, task *Task) error {
mirrorFile, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucket)

if bucketModel.Backup && mirrorFile == nil {
if err := s.setMirrorFileBackup(ctx, path, bucket); err != nil {
if err := s.setMirrorFileBackup(ctx, path, bucket, true); err != nil {
return err
}
}

if s.eventNotifier != nil {
s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupInProgress, nil))
}

pft := newTask(pinFileTask, []string{bucket, path})
pft.Parallelizable = true
s.enqueueTask(pft, s.filePinningQueue)

s.notifySyncNeeded()

return nil
}

Expand All @@ -45,8 +56,14 @@ func (s *synchronizer) processRemoveItem(ctx context.Context, task *Task) error
return err
}

// bucket := task.Args[0]
// path := task.Args[1]
bucket := task.Args[0]
path := task.Args[1]

uft := newTask(unpinFileTask, []string{bucket, path})
uft.Parallelizable = true
s.enqueueTask(uft, s.filePinningQueue)

s.notifySyncNeeded()

// TODO: Remove file from mirror
return nil
Expand All @@ -61,6 +78,11 @@ func (s *synchronizer) processPinFile(ctx context.Context, task *Task) error {
path := task.Args[1]

err := s.uploadFileToRemote(ctx, bucket, path)
s.setMirrorFileBackup(ctx, path, bucket, false)

if s.eventNotifier != nil {
s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupReady, nil))
}

return err
}
Expand Down
2 changes: 2 additions & 0 deletions core/textile/textile.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/FleekHQ/space-daemon/core/space/domain"
"github.com/FleekHQ/space-daemon/core/textile/bucket"
"github.com/FleekHQ/space-daemon/core/textile/model"
"github.com/FleekHQ/space-daemon/core/textile/sync"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/textileio/go-threads/db"

Expand Down Expand Up @@ -58,6 +59,7 @@ type Client interface {
RejectSharedFilesInvitation(ctx context.Context, invitation domain.Invitation) (domain.Invitation, error)
RemoveKeys() error
AttachMailboxNotifier(notif GrpcMailboxNotifier)
AttachSynchronizerNotifier(notif sync.EventNotifier)
GetReceivedFiles(ctx context.Context, accepted bool, seek string, limit int) ([]*domain.SharedDirEntry, string, error)
GetPathAccessRoles(ctx context.Context, b Bucket, path string) ([]domain.Member, error)
GetPublicShareBucket(ctx context.Context) (Bucket, error)
Expand Down
4 changes: 3 additions & 1 deletion grpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var errNotImplemented = errors.New("Not implemented")

func (srv *grpcServer) sendFileEvent(event *pb.FileEventResponse) {
if srv.fileEventStream != nil {
log.Info("sending events to client")
log.Info("sending events to client", event.String())
srv.fileEventStream.Send(event)
}
}
Expand Down Expand Up @@ -75,6 +75,7 @@ func (srv *grpcServer) ListDirectories(ctx context.Context, request *pb.ListDire
Members: members,
IsLocallyAvailable: e.LocallyAvailable,
BackupCount: int64(backupCount),
IsBackupInProgress: e.BackupInProgress,
}
dirEntries = append(dirEntries, dirEntry)
}
Expand Down Expand Up @@ -124,6 +125,7 @@ func (srv *grpcServer) ListDirectory(
Members: members,
BackupCount: int64(backupCount),
IsLocallyAvailable: e.LocallyAvailable,
IsBackupInProgress: e.BackupInProgress,
}
dirEntries = append(dirEntries, dirEntry)
}
Expand Down
Loading

0 comments on commit 5dd79d6

Please sign in to comment.