diff --git a/app/app.go b/app/app.go index 676c3a43..45fdda59 100644 --- a/app/app.go +++ b/app/app.go @@ -178,6 +178,7 @@ func (a *App) Start(ctx context.Context) error { ) textileClient.AttachMailboxNotifier(srv) + textileClient.AttachSynchronizerNotifier(srv) err = a.RunAsync("BucketSync", bucketSync, func() error { bucketSync.RegisterNotifier(srv) return bucketSync.Start(ctx) diff --git a/core/events/events.go b/core/events/events.go index 13948e94..666627a4 100644 --- a/core/events/events.go +++ b/core/events/events.go @@ -9,9 +9,11 @@ import ( type FileEventType string const ( - FileAdded FileEventType = "FileAdded" - FileDeleted FileEventType = "FileDeleted" - FileUpdated FileEventType = "FileUpdated" + FileAdded FileEventType = "FileAdded" + FileDeleted FileEventType = "FileDeleted" + FileUpdated FileEventType = "FileUpdated" + FileBackupInProgress FileEventType = "FileBackupInProgress" + FileBackupReady FileEventType = "FileBackupReady" FolderAdded FileEventType = "FolderAdded" FolderDeleted FileEventType = "FolderDeleted" @@ -20,16 +22,18 @@ const ( ) type FileEvent struct { - Path string - Info os.FileInfo - Type FileEventType + Path string + Bucket string + Info os.FileInfo + Type FileEventType } -func NewFileEvent(path string, eventType FileEventType, info os.FileInfo) FileEvent { +func NewFileEvent(path, bucket string, eventType FileEventType, info os.FileInfo) FileEvent { return FileEvent{ - Path: path, - Type: eventType, - Info: info, + Path: path, + Bucket: bucket, + Type: eventType, + Info: info, } } diff --git a/core/space/domain/domain.go b/core/space/domain/domain.go index c3cdf951..31b644ab 100644 --- a/core/space/domain/domain.go +++ b/core/space/domain/domain.go @@ -28,6 +28,7 @@ type FileInfo struct { IpfsHash string BackedUp bool LocallyAvailable bool + BackupInProgress bool } type OpenFileInfo struct { @@ -150,10 +151,11 @@ type APISessionTokens struct { } type MirrorFile struct { - Path string - BucketSlug string - Backup bool - Shared bool + Path string + BucketSlug string + Backup bool + Shared bool + BackupInProgress bool } type SharedDirEntry struct { diff --git a/core/space/services/services_fs.go b/core/space/services/services_fs.go index 2a51386f..fc6b0b7c 100644 --- a/core/space/services/services_fs.go +++ b/core/space/services/services_fs.go @@ -196,8 +196,10 @@ func (s *Space) listDirAtPath( } backedup := false + backupInProgress := false if mirror_files[item.Path] != nil { backedup = mirror_files[item.Path].Backup + backupInProgress = mirror_files[item.Path].BackupInProgress } locallyAvailable := false @@ -220,6 +222,7 @@ func (s *Space) listDirAtPath( IpfsHash: item.Cid, BackedUp: backedup, LocallyAvailable: locallyAvailable, + BackupInProgress: backupInProgress, } entries = append(entries, entry) diff --git a/core/textile/client.go b/core/textile/client.go index 2c6a614b..a8c53b13 100644 --- a/core/textile/client.go +++ b/core/textile/client.go @@ -501,3 +501,7 @@ func (tc *textileClient) requiresHubConnection() error { } return nil } + +func (tc *textileClient) AttachSynchronizerNotifier(notif synchronizer.EventNotifier) { + tc.sync.AttachNotifier(notif) +} diff --git a/core/textile/model/mirror_file.go b/core/textile/model/mirror_file.go index 092f7585..5391dc85 100644 --- a/core/textile/model/mirror_file.go +++ b/core/textile/model/mirror_file.go @@ -16,13 +16,13 @@ import ( ) type MirrorFileSchema struct { - ID core.InstanceID `json:"_id"` - Path string `json:"path"` - BucketSlug string `json:"bucket_slug"` - Backup bool `json:"backup"` - Shared bool `json:"shared"` - - DbID string + ID core.InstanceID `json:"_id"` + Path string `json:"path"` + BucketSlug string `json:"bucket_slug"` + Backup bool `json:"backup"` + Shared bool `json:"shared"` + BackupInProgress bool `json:"backupInProgress"` + DbID string } type MirrorBucketSchema struct { @@ -141,10 +141,11 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF } newInstance := &MirrorFileSchema{ - Path: mirrorFile.Path, - BucketSlug: mirrorFile.BucketSlug, - Backup: mirrorFile.Backup, - Shared: mirrorFile.Shared, + Path: mirrorFile.Path, + BucketSlug: mirrorFile.BucketSlug, + Backup: mirrorFile.Backup, + BackupInProgress: mirrorFile.BackupInProgress, + Shared: mirrorFile.Shared, } instances := client.Instances{newInstance} @@ -156,12 +157,13 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF id := res[0] return &MirrorFileSchema{ - Path: newInstance.Path, - BucketSlug: newInstance.BucketSlug, - Backup: newInstance.Backup, - Shared: newInstance.Shared, - ID: core.InstanceID(id), - DbID: newInstance.DbID, + Path: newInstance.Path, + BucketSlug: newInstance.BucketSlug, + Backup: newInstance.Backup, + BackupInProgress: newInstance.BackupInProgress, + Shared: newInstance.Shared, + ID: core.InstanceID(id), + DbID: newInstance.DbID, }, nil } @@ -209,5 +211,11 @@ func (m *model) initMirrorFileModel(ctx context.Context) (context.Context, *thre }}, }) + // Migrates db by adding new fields between old version of the daemon and a new one + m.threads.UpdateCollection(metaCtx, *dbID, db.CollectionConfig{ + Name: mirrorFileModelName, + Schema: util.SchemaFromInstance(&MirrorFileSchema{}, false), + }) + return metaCtx, dbID, nil } diff --git a/core/textile/sharing.go b/core/textile/sharing.go index f9ef13e2..520c5cbd 100644 --- a/core/textile/sharing.go +++ b/core/textile/sharing.go @@ -199,6 +199,10 @@ func (tc *textileClient) GetReceivedFiles(ctx context.Context, accepted bool, se IpfsHash: ipfsHash, LocallyAvailable: false, BackedUp: true, + + // TODO: Reflect correct state when we add local updates syncing to remote + BackupInProgress: false, + DirEntry: domain.DirEntry{ Path: file.Path, IsDir: isDir, diff --git a/core/textile/sync/mirror.go b/core/textile/sync/mirror.go index eb4f7598..6bca4487 100644 --- a/core/textile/sync/mirror.go +++ b/core/textile/sync/mirror.go @@ -17,14 +17,15 @@ import ( const mirrorThreadKeyName = "mirrorV1" -func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string) error { +func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string, isInProgress bool) error { mf, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug) if err != nil { return err } if mf != nil { // update - mf.Backup = true + mf.Backup = !isInProgress + mf.BackupInProgress = isInProgress _, err = s.model.UpdateMirrorFile(ctx, mf) if err != nil { @@ -33,10 +34,11 @@ func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug } else { // create mf := &domain.MirrorFile{ - Path: path, - BucketSlug: bucketSlug, - Backup: true, - Shared: false, + Path: path, + BucketSlug: bucketSlug, + Backup: !isInProgress, + BackupInProgress: isInProgress, + Shared: false, } _, err := s.model.CreateMirrorFile(ctx, mf) diff --git a/core/textile/sync/sync.go b/core/textile/sync/sync.go index 8d3d6055..bfb5bc95 100644 --- a/core/textile/sync/sync.go +++ b/core/textile/sync/sync.go @@ -2,8 +2,14 @@ package sync import ( "context" + + "github.com/FleekHQ/space-daemon/core/events" ) +type EventNotifier interface { + SendFileEvent(event events.FileEvent) +} + type Synchronizer interface { NotifyItemAdded(bucket, path string) NotifyItemRemoved(bucket, path string) @@ -14,4 +20,5 @@ type Synchronizer interface { RestoreQueue() error Shutdown() String() string + AttachNotifier(EventNotifier) } diff --git a/core/textile/sync/sync_test.go b/core/textile/sync/sync_test.go index 82963529..34db5b44 100644 --- a/core/textile/sync/sync_test.go +++ b/core/textile/sync/sync_test.go @@ -69,7 +69,7 @@ func TestSync_ProcessTask(t *testing.T) { // Makes the processAddItem and processPinFilefail right away mockModel.On("FindBucket", mock.Anything, mock.Anything).Return(nil, errors.New("some error")) - mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + // mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error")) mockStore.On("Set", mock.Anything, mock.Anything).Return(nil) @@ -77,7 +77,7 @@ func TestSync_ProcessTask(t *testing.T) { s.Shutdown() - expectedState := "Textile sync [file pinning]: Total: 1, Queued: 1, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n" + expectedState := "Textile sync [file pinning]: Total: 0, Queued: 0, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n" assert.Equal(t, expectedState, s.String()) mockModel.AssertExpectations(t) diff --git a/core/textile/sync/synchronizer.go b/core/textile/sync/synchronizer.go index bb941436..34f85768 100644 --- a/core/textile/sync/synchronizer.go +++ b/core/textile/sync/synchronizer.go @@ -47,6 +47,7 @@ type synchronizer struct { cfg config.Config netc *nc.Client queueWg *sync.WaitGroup + eventNotifier EventNotifier } // Creates a new Synchronizer @@ -103,10 +104,6 @@ func (s *synchronizer) NotifyItemAdded(bucket, path string) { t := newTask(addItemTask, []string{bucket, path}) s.enqueueTask(t, s.taskQueue) - pft := newTask(pinFileTask, []string{bucket, path}) - pft.Parallelizable = true - s.enqueueTask(pft, s.filePinningQueue) - s.notifySyncNeeded() } @@ -115,10 +112,6 @@ func (s *synchronizer) NotifyItemRemoved(bucket, path string) { t := newTask(removeItemTask, []string{bucket, path}) s.enqueueTask(t, s.taskQueue) - uft := newTask(unpinFileTask, []string{bucket, path}) - uft.Parallelizable = true - s.enqueueTask(uft, s.filePinningQueue) - s.notifySyncNeeded() } @@ -348,3 +341,7 @@ func (s *synchronizer) sync(ctx context.Context, queue *list.List) error { return nil } + +func (s *synchronizer) AttachNotifier(notif EventNotifier) { + s.eventNotifier = notif +} diff --git a/core/textile/sync/task-executors.go b/core/textile/sync/task-executors.go index 08e98d8a..70b6e42f 100644 --- a/core/textile/sync/task-executors.go +++ b/core/textile/sync/task-executors.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" + "github.com/FleekHQ/space-daemon/core/events" "github.com/FleekHQ/space-daemon/core/textile/utils" ) @@ -32,11 +33,21 @@ func (s *synchronizer) processAddItem(ctx context.Context, task *Task) error { mirrorFile, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucket) if bucketModel.Backup && mirrorFile == nil { - if err := s.setMirrorFileBackup(ctx, path, bucket); err != nil { + if err := s.setMirrorFileBackup(ctx, path, bucket, true); err != nil { return err } } + if s.eventNotifier != nil { + s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupInProgress, nil)) + } + + pft := newTask(pinFileTask, []string{bucket, path}) + pft.Parallelizable = true + s.enqueueTask(pft, s.filePinningQueue) + + s.notifySyncNeeded() + return nil } @@ -45,8 +56,14 @@ func (s *synchronizer) processRemoveItem(ctx context.Context, task *Task) error return err } - // bucket := task.Args[0] - // path := task.Args[1] + bucket := task.Args[0] + path := task.Args[1] + + uft := newTask(unpinFileTask, []string{bucket, path}) + uft.Parallelizable = true + s.enqueueTask(uft, s.filePinningQueue) + + s.notifySyncNeeded() // TODO: Remove file from mirror return nil @@ -61,6 +78,11 @@ func (s *synchronizer) processPinFile(ctx context.Context, task *Task) error { path := task.Args[1] err := s.uploadFileToRemote(ctx, bucket, path) + s.setMirrorFileBackup(ctx, path, bucket, false) + + if s.eventNotifier != nil { + s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupReady, nil)) + } return err } diff --git a/core/textile/textile.go b/core/textile/textile.go index 15e288da..9cea0ca5 100644 --- a/core/textile/textile.go +++ b/core/textile/textile.go @@ -10,6 +10,7 @@ import ( "github.com/FleekHQ/space-daemon/core/space/domain" "github.com/FleekHQ/space-daemon/core/textile/bucket" "github.com/FleekHQ/space-daemon/core/textile/model" + "github.com/FleekHQ/space-daemon/core/textile/sync" "github.com/libp2p/go-libp2p-core/crypto" "github.com/textileio/go-threads/db" @@ -58,6 +59,7 @@ type Client interface { RejectSharedFilesInvitation(ctx context.Context, invitation domain.Invitation) (domain.Invitation, error) RemoveKeys() error AttachMailboxNotifier(notif GrpcMailboxNotifier) + AttachSynchronizerNotifier(notif sync.EventNotifier) GetReceivedFiles(ctx context.Context, accepted bool, seek string, limit int) ([]*domain.SharedDirEntry, string, error) GetPathAccessRoles(ctx context.Context, b Bucket, path string) ([]domain.Member, error) GetPublicShareBucket(ctx context.Context) (Bucket, error) diff --git a/grpc/handlers.go b/grpc/handlers.go index 4fde3bc7..1a311675 100644 --- a/grpc/handlers.go +++ b/grpc/handlers.go @@ -15,7 +15,7 @@ var errNotImplemented = errors.New("Not implemented") func (srv *grpcServer) sendFileEvent(event *pb.FileEventResponse) { if srv.fileEventStream != nil { - log.Info("sending events to client") + log.Info("sending events to client", event.String()) srv.fileEventStream.Send(event) } } @@ -75,6 +75,7 @@ func (srv *grpcServer) ListDirectories(ctx context.Context, request *pb.ListDire Members: members, IsLocallyAvailable: e.LocallyAvailable, BackupCount: int64(backupCount), + IsBackupInProgress: e.BackupInProgress, } dirEntries = append(dirEntries, dirEntry) } @@ -124,6 +125,7 @@ func (srv *grpcServer) ListDirectory( Members: members, BackupCount: int64(backupCount), IsLocallyAvailable: e.LocallyAvailable, + IsBackupInProgress: e.BackupInProgress, } dirEntries = append(dirEntries, dirEntry) } diff --git a/grpc/pb/space.pb.go b/grpc/pb/space.pb.go index 86156ad2..fad7fb67 100644 --- a/grpc/pb/space.pb.go +++ b/grpc/pb/space.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.6.1 +// protoc v3.12.2 // source: space.proto package pb @@ -704,6 +704,7 @@ type ListDirectoryEntry struct { IsLocallyAvailable bool `protobuf:"varint,9,opt,name=isLocallyAvailable,proto3" json:"isLocallyAvailable,omitempty"` BackupCount int64 `protobuf:"varint,10,opt,name=backupCount,proto3" json:"backupCount,omitempty"` Members []*FileMember `protobuf:"bytes,11,rep,name=members,proto3" json:"members,omitempty"` + IsBackupInProgress bool `protobuf:"varint,12,opt,name=isBackupInProgress,proto3" json:"isBackupInProgress,omitempty"` } func (x *ListDirectoryEntry) Reset() { @@ -815,6 +816,13 @@ func (x *ListDirectoryEntry) GetMembers() []*FileMember { return nil } +func (x *ListDirectoryEntry) GetIsBackupInProgress() bool { + if x != nil { + return x.IsBackupInProgress + } + return false +} + type SharedListDirectoryEntry struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -4526,7 +4534,7 @@ var file_space_proto_rawDesc = []byte{ 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, - 0x65, 0x73, 0x73, 0x22, 0xe9, 0x02, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x44, 0x69, 0x72, 0x65, + 0x65, 0x73, 0x73, 0x22, 0x99, 0x03, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x73, 0x44, 0x69, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x69, @@ -4548,7 +4556,10 @@ var file_space_proto_rawDesc = []byte{ 0x0a, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x2b, 0x0a, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2e, 0x46, 0x69, 0x6c, 0x65, - 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x22, + 0x4d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x07, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x73, 0x12, + 0x2e, 0x0a, 0x12, 0x69, 0x73, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x49, 0x6e, 0x50, 0x72, 0x6f, + 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x69, 0x73, 0x42, + 0x61, 0x63, 0x6b, 0x75, 0x70, 0x49, 0x6e, 0x50, 0x72, 0x6f, 0x67, 0x72, 0x65, 0x73, 0x73, 0x22, 0x77, 0x0a, 0x18, 0x53, 0x68, 0x61, 0x72, 0x65, 0x64, 0x4c, 0x69, 0x73, 0x74, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x2f, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x73, 0x70, 0x61, diff --git a/grpc/proto/space.proto b/grpc/proto/space.proto index 05f95c8c..7d6d6c29 100644 --- a/grpc/proto/space.proto +++ b/grpc/proto/space.proto @@ -357,6 +357,7 @@ message ListDirectoryEntry { bool isLocallyAvailable = 9; int64 backupCount = 10; repeated FileMember members = 11; + bool isBackupInProgress = 12; } message SharedListDirectoryEntry { diff --git a/mocks/Client.go b/mocks/Client.go index af2d45c2..30e7fc1b 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -21,6 +21,8 @@ import ( model "github.com/FleekHQ/space-daemon/core/textile/model" + sync "github.com/FleekHQ/space-daemon/core/textile/sync" + textile "github.com/FleekHQ/space-daemon/core/textile" usersclient "github.com/textileio/textile/api/users/client" @@ -57,6 +59,11 @@ func (_m *Client) AttachMailboxNotifier(notif textile.GrpcMailboxNotifier) { _m.Called(notif) } +// AttachSynchronizerNotifier provides a mock function with given fields: notif +func (_m *Client) AttachSynchronizerNotifier(notif sync.EventNotifier) { + _m.Called(notif) +} + // CreateBucket provides a mock function with given fields: ctx, bucketSlug func (_m *Client) CreateBucket(ctx context.Context, bucketSlug string) (textile.Bucket, error) { ret := _m.Called(ctx, bucketSlug) diff --git a/swagger/ui/space.swagger.json b/swagger/ui/space.swagger.json index 041d4997..cc374906 100644 --- a/swagger/ui/space.swagger.json +++ b/swagger/ui/space.swagger.json @@ -1858,6 +1858,10 @@ "items": { "$ref": "#/definitions/spaceFileMember" } + }, + "isBackupInProgress": { + "type": "boolean", + "format": "boolean" } } },