Skip to content

Commit

Permalink
Merge pull request #206 from FleekHQ/develop
Browse files Browse the repository at this point in the history
Release develop to master
  • Loading branch information
jsonsivar authored Oct 8, 2020
2 parents 65030e2 + e4db554 commit 1bef0c3
Show file tree
Hide file tree
Showing 24 changed files with 234 additions and 71 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (a *App) Start(ctx context.Context) error {
)

textileClient.AttachMailboxNotifier(srv)
textileClient.AttachSynchronizerNotifier(srv)
err = a.RunAsync("BucketSync", bucketSync, func() error {
bucketSync.RegisterNotifier(srv)
return bucketSync.Start(ctx)
Expand Down
24 changes: 14 additions & 10 deletions core/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
type FileEventType string

const (
FileAdded FileEventType = "FileAdded"
FileDeleted FileEventType = "FileDeleted"
FileUpdated FileEventType = "FileUpdated"
FileAdded FileEventType = "FileAdded"
FileDeleted FileEventType = "FileDeleted"
FileUpdated FileEventType = "FileUpdated"
FileBackupInProgress FileEventType = "FileBackupInProgress"
FileBackupReady FileEventType = "FileBackupReady"

FolderAdded FileEventType = "FolderAdded"
FolderDeleted FileEventType = "FolderDeleted"
Expand All @@ -20,16 +22,18 @@ const (
)

type FileEvent struct {
Path string
Info os.FileInfo
Type FileEventType
Path string
Bucket string
Info os.FileInfo
Type FileEventType
}

func NewFileEvent(path string, eventType FileEventType, info os.FileInfo) FileEvent {
func NewFileEvent(path, bucket string, eventType FileEventType, info os.FileInfo) FileEvent {
return FileEvent{
Path: path,
Type: eventType,
Info: info,
Path: path,
Bucket: bucket,
Type: eventType,
Info: info,
}
}

Expand Down
10 changes: 6 additions & 4 deletions core/space/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type FileInfo struct {
IpfsHash string
BackedUp bool
LocallyAvailable bool
BackupInProgress bool
}

type OpenFileInfo struct {
Expand Down Expand Up @@ -150,10 +151,11 @@ type APISessionTokens struct {
}

type MirrorFile struct {
Path string
BucketSlug string
Backup bool
Shared bool
Path string
BucketSlug string
Backup bool
Shared bool
BackupInProgress bool
}

type SharedDirEntry struct {
Expand Down
3 changes: 3 additions & 0 deletions core/space/services/services_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ func (s *Space) listDirAtPath(
}

backedup := false
backupInProgress := false
if mirror_files[item.Path] != nil {
backedup = mirror_files[item.Path].Backup
backupInProgress = mirror_files[item.Path].BackupInProgress
}

locallyAvailable := false
Expand All @@ -220,6 +222,7 @@ func (s *Space) listDirAtPath(
IpfsHash: item.Cid,
BackedUp: backedup,
LocallyAvailable: locallyAvailable,
BackupInProgress: backupInProgress,
}
entries = append(entries, entry)

Expand Down
5 changes: 5 additions & 0 deletions core/textile/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type BucketInterface interface {
ctx context.Context,
path string,
) (path.Resolved, error)
ItemsCount(
ctx context.Context,
path string,
withRecursive bool,
) (int32, error)
}

type Notifier interface {
Expand Down
35 changes: 35 additions & 0 deletions core/textile/bucket/bucket_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"
"strings"

"github.com/FleekHQ/space-daemon/core/textile/utils"
"github.com/FleekHQ/space-daemon/log"
"github.com/ipfs/interface-go-ipfs-core/path"
)
Expand Down Expand Up @@ -80,3 +81,37 @@ func (b *Bucket) DeleteDirOrFile(ctx context.Context, path string) (path.Resolve

return b.bucketsClient.RemovePath(ctx, b.Key(), path)
}

// return the recursive items count for a path
func (b *Bucket) ItemsCount(ctx context.Context, path string, withRecursive bool) (int32, error) {
b.lock.RLock()
defer b.lock.RUnlock()

var count int32

dir, err := b.ListDirectory(ctx, path)
if err != nil {
return 0, err
}

count = dir.Item.ItemsCount

if withRecursive {
for _, item := range dir.Item.Items {
if utils.IsMetaFileName(item.Name) {
continue
}

if item.IsDir {
n, err := b.ItemsCount(ctx, item.Path, withRecursive)
if err != nil {
return 0, err
}

count += n
}
}
}

return count, nil
}
4 changes: 4 additions & 0 deletions core/textile/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,7 @@ func (tc *textileClient) requiresHubConnection() error {
}
return nil
}

func (tc *textileClient) AttachSynchronizerNotifier(notif synchronizer.EventNotifier) {
tc.sync.AttachNotifier(notif)
}
42 changes: 25 additions & 17 deletions core/textile/model/mirror_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
)

type MirrorFileSchema struct {
ID core.InstanceID `json:"_id"`
Path string `json:"path"`
BucketSlug string `json:"bucket_slug"`
Backup bool `json:"backup"`
Shared bool `json:"shared"`

DbID string
ID core.InstanceID `json:"_id"`
Path string `json:"path"`
BucketSlug string `json:"bucket_slug"`
Backup bool `json:"backup"`
Shared bool `json:"shared"`
BackupInProgress bool `json:"backupInProgress"`
DbID string
}

type MirrorBucketSchema struct {
Expand Down Expand Up @@ -141,10 +141,11 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF
}

newInstance := &MirrorFileSchema{
Path: mirrorFile.Path,
BucketSlug: mirrorFile.BucketSlug,
Backup: mirrorFile.Backup,
Shared: mirrorFile.Shared,
Path: mirrorFile.Path,
BucketSlug: mirrorFile.BucketSlug,
Backup: mirrorFile.Backup,
BackupInProgress: mirrorFile.BackupInProgress,
Shared: mirrorFile.Shared,
}

instances := client.Instances{newInstance}
Expand All @@ -156,12 +157,13 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF

id := res[0]
return &MirrorFileSchema{
Path: newInstance.Path,
BucketSlug: newInstance.BucketSlug,
Backup: newInstance.Backup,
Shared: newInstance.Shared,
ID: core.InstanceID(id),
DbID: newInstance.DbID,
Path: newInstance.Path,
BucketSlug: newInstance.BucketSlug,
Backup: newInstance.Backup,
BackupInProgress: newInstance.BackupInProgress,
Shared: newInstance.Shared,
ID: core.InstanceID(id),
DbID: newInstance.DbID,
}, nil
}

Expand Down Expand Up @@ -209,5 +211,11 @@ func (m *model) initMirrorFileModel(ctx context.Context) (context.Context, *thre
}},
})

// Migrates db by adding new fields between old version of the daemon and a new one
m.threads.UpdateCollection(metaCtx, *dbID, db.CollectionConfig{
Name: mirrorFileModelName,
Schema: util.SchemaFromInstance(&MirrorFileSchema{}, false),
})

return metaCtx, dbID, nil
}
4 changes: 4 additions & 0 deletions core/textile/sharing.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ func (tc *textileClient) GetReceivedFiles(ctx context.Context, accepted bool, se
IpfsHash: ipfsHash,
LocallyAvailable: false,
BackedUp: true,

// TODO: Reflect correct state when we add local updates syncing to remote
BackupInProgress: false,

DirEntry: domain.DirEntry{
Path: file.Path,
IsDir: isDir,
Expand Down
14 changes: 8 additions & 6 deletions core/textile/sync/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ import (

const mirrorThreadKeyName = "mirrorV1"

func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string) error {
func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string, isInProgress bool) error {
mf, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug)
if err != nil {
return err
}
if mf != nil {
// update
mf.Backup = true
mf.Backup = !isInProgress
mf.BackupInProgress = isInProgress

_, err = s.model.UpdateMirrorFile(ctx, mf)
if err != nil {
Expand All @@ -33,10 +34,11 @@ func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug
} else {
// create
mf := &domain.MirrorFile{
Path: path,
BucketSlug: bucketSlug,
Backup: true,
Shared: false,
Path: path,
BucketSlug: bucketSlug,
Backup: !isInProgress,
BackupInProgress: isInProgress,
Shared: false,
}

_, err := s.model.CreateMirrorFile(ctx, mf)
Expand Down
7 changes: 7 additions & 0 deletions core/textile/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ package sync

import (
"context"

"github.com/FleekHQ/space-daemon/core/events"
)

type EventNotifier interface {
SendFileEvent(event events.FileEvent)
}

type Synchronizer interface {
NotifyItemAdded(bucket, path string)
NotifyItemRemoved(bucket, path string)
Expand All @@ -14,4 +20,5 @@ type Synchronizer interface {
RestoreQueue() error
Shutdown()
String() string
AttachNotifier(EventNotifier)
}
4 changes: 2 additions & 2 deletions core/textile/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ func TestSync_ProcessTask(t *testing.T) {

// Makes the processAddItem and processPinFilefail right away
mockModel.On("FindBucket", mock.Anything, mock.Anything).Return(nil, errors.New("some error"))
mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error"))
// mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error"))

mockStore.On("Set", mock.Anything, mock.Anything).Return(nil)

s.Start(ctx)

s.Shutdown()

expectedState := "Textile sync [file pinning]: Total: 1, Queued: 1, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n"
expectedState := "Textile sync [file pinning]: Total: 0, Queued: 0, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n"

assert.Equal(t, expectedState, s.String())
mockModel.AssertExpectations(t)
Expand Down
13 changes: 5 additions & 8 deletions core/textile/sync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type synchronizer struct {
cfg config.Config
netc *nc.Client
queueWg *sync.WaitGroup
eventNotifier EventNotifier
}

// Creates a new Synchronizer
Expand Down Expand Up @@ -103,10 +104,6 @@ func (s *synchronizer) NotifyItemAdded(bucket, path string) {
t := newTask(addItemTask, []string{bucket, path})
s.enqueueTask(t, s.taskQueue)

pft := newTask(pinFileTask, []string{bucket, path})
pft.Parallelizable = true
s.enqueueTask(pft, s.filePinningQueue)

s.notifySyncNeeded()
}

Expand All @@ -115,10 +112,6 @@ func (s *synchronizer) NotifyItemRemoved(bucket, path string) {
t := newTask(removeItemTask, []string{bucket, path})
s.enqueueTask(t, s.taskQueue)

uft := newTask(unpinFileTask, []string{bucket, path})
uft.Parallelizable = true
s.enqueueTask(uft, s.filePinningQueue)

s.notifySyncNeeded()
}

Expand Down Expand Up @@ -348,3 +341,7 @@ func (s *synchronizer) sync(ctx context.Context, queue *list.List) error {

return nil
}

func (s *synchronizer) AttachNotifier(notif EventNotifier) {
s.eventNotifier = notif
}
Loading

0 comments on commit 1bef0c3

Please sign in to comment.