diff --git a/core/space/services/services_fs.go b/core/space/services/services_fs.go index 5c28ba65..2a51386f 100644 --- a/core/space/services/services_fs.go +++ b/core/space/services/services_fs.go @@ -103,23 +103,11 @@ func (s *Space) ToggleBucketBackup(ctx context.Context, bucketName string, bucke return err } - b, err := s.tc.GetBucket(ctx, bucketName, nil) + _, err = s.tc.GetBucket(ctx, bucketName, nil) if err != nil { return err } - if bucketBackup == true { - _, err = s.tc.BackupBucket(ctx, b) - if err != nil { - return err - } - } else { - _, err = s.tc.UnbackupBucket(ctx, b) - if err != nil { - return err - } - } - return nil } @@ -663,15 +651,6 @@ func (s *Space) addFile(ctx context.Context, sourcePath string, targetPath strin return domain.AddItemResult{}, err } - if s.tc.IsBucketBackup(ctx, b.Slug()) && !s.tc.IsMirrorFile(ctx, targetPathBucket, b.Slug()) { - f.Seek(0, io.SeekStart) - - err = s.tc.BackupFileWithReader(ctx, b, targetPathBucket, f) - if err != nil { - return domain.AddItemResult{}, err - } - } - fi, err := f.Stat() var fileSize int64 = 0 if err == nil { diff --git a/core/space/space_test.go b/core/space/space_test.go index 9b801603..bfbe6053 100644 --- a/core/space/space_test.go +++ b/core/space/space_test.go @@ -394,11 +394,6 @@ func TestService_AddItems_FilesOnly(t *testing.T) { "Slug", ).Return("personal") - textileClient.On( - "IsBucketBackup", - mock.Anything, mock.Anything, - ).Return(false) - mockPath.On("String").Return("hash") for _, f := range testSourcePaths { @@ -475,11 +470,6 @@ func TestService_AddItems_Folder(t *testing.T) { "Slug", ).Return("personal") - textileClient.On( - "IsBucketBackup", - mock.Anything, mock.Anything, - ).Return(false) - ch, res, err := sv.AddItems(context.Background(), testSourcePaths, bucketPath, "") assert.Nil(t, err) diff --git a/core/textile/backup.go b/core/textile/backup.go deleted file mode 100644 index 52bcc314..00000000 --- a/core/textile/backup.go +++ /dev/null @@ -1,291 +0,0 @@ -package textile - -import ( - "context" - "fmt" - "io" - "strings" - "sync" - - "github.com/FleekHQ/space-daemon/log" - - "github.com/textileio/go-threads/core/thread" -) - -// backup a single file with an io.Reader provided -func (tc *textileClient) BackupFileWithReader(ctx context.Context, bucket Bucket, path string, reader io.Reader) (err error) { - - if _, _, err = tc.UploadFileToHub(ctx, bucket, path, reader); err != nil { - log.Error(fmt.Sprintf("error backuping up file with reader (path=%+v b.Slug=%+v)", path, bucket.Slug()), err) - return err - } - - if err = tc.setMirrorFileBackup(ctx, path, bucket.Slug()); err != nil { - log.Error(fmt.Sprintf("error setting mirror file as backup (path=%+v b.Slug=%+v)", path, bucket.Slug()), err) - return err - } - - return nil -} - -// backup a single file with a path provided -func (tc *textileClient) backupFile(ctx context.Context, bucket Bucket, path string) error { - - errc := make(chan error, 1) - pipeReader, pipeWriter := io.Pipe() - - // go routine for piping - go func() { - defer close(errc) - defer pipeWriter.Close() - - err := bucket.GetFile(ctx, path, pipeWriter) - if err != nil { - errc <- err - log.Error(fmt.Sprintf("error getting file (path=%+v b.Slug=%+v)", path, bucket.Slug()), err) - return - } - - errc <- nil - - }() - - if err := tc.BackupFileWithReader(ctx, bucket, path, pipeReader); err != nil { - log.Error(fmt.Sprintf("error backing up file (path=%+v b.Slug=%+v)", path, bucket.Slug()), err) - return err - } - - if err := <-errc; err != nil { - return err - } - - return nil -} - -// backup all files in a bucket -func (tc *textileClient) backupBucketFiles(ctx context.Context, bucket Bucket, path string) (int, error) { - var wg sync.WaitGroup - var count int - - bucketSlug := bucket.Slug() - - // XXX: we ignore errc (no atomicity at all) for now but we should return - // XXX: the errors, perhaps in a separate async call - errc := make(chan error) - - dir, err := bucket.ListDirectory(ctx, path) - if err != nil { - return 0, err - } - - for _, item := range dir.Item.Items { - if item.Name == ".textileseed" || item.Name == ".textile" { - continue - } - - if item.IsDir { - p := strings.Join([]string{path, item.Name}, "/") - - n, err := tc.backupBucketFiles(ctx, bucket, p) - if err != nil { - return count, err - } - - count += n - continue - } - - wg.Add(1) - - // parallelize the backups - go func(path string) { - defer wg.Done() - - // skip already set as backup - if tc.isMirrorBackupFile(ctx, bucketSlug, path) { - return - } - if err = tc.backupFile(ctx, bucket, path); err != nil { - select { - case errc <- err: - // make sure no block - default: - } - - return - } - - count += 1 - }(item.Path) - } - - wg.Wait() - - return count, nil -} - -// meta, key and share with me thread ids -func (tc *textileClient) userThreadIds(ctx context.Context, bucket Bucket) ([]thread.ID, error) { - dbIds := make([]thread.ID, 0) - - // key - bucketThreadID, err := bucket.GetThreadID(ctx) - if err != nil { - return nil, err - } - dbIds = append(dbIds, *bucketThreadID) - - // shared with me - publicShareId, err := tc.getPublicShareThread(ctx) - if err != nil { - return nil, err - } - dbIds = append(dbIds, publicShareId) - - return dbIds, nil -} - -// replicate meta, key and shared with me threads -func (tc *textileClient) replicateThreadsToHub(ctx context.Context, bucket Bucket) (count int, err error) { - replicatedDbIds := make([]thread.ID, 0) - - // poor man's transactionality - defer func() { - if err != nil { - for _, dbId := range replicatedDbIds { - err = tc.DereplicateThreadFromHub(ctx, &dbId) - if err != nil { - log.Error(fmt.Sprintf("failed to dereplicate thread (dbId=%+v b.Slug=%+v)", dbId, bucket.Slug()), err) - } - } - } - }() - - userThreadIds, err := tc.userThreadIds(ctx, bucket) - if err != nil { - return 0, err - } - - for _, dbId := range userThreadIds { - if err = tc.ReplicateThreadToHub(ctx, &dbId); err != nil { - return 0, err - } - - replicatedDbIds = append(replicatedDbIds, dbId) - } - - return len(replicatedDbIds), nil -} - -// backup the bucket -func (tc *textileClient) BackupBucket(ctx context.Context, bucket Bucket) (count int, err error) { - - count, err = tc.backupBucketFiles(ctx, bucket, "") - if err != nil { - return 0, nil - } - - if _, err = tc.replicateThreadsToHub(ctx, bucket); err != nil { - return 0, nil - } - - return count, nil -} - -// unbackup a single file -func (tc *textileClient) unbackupFile(ctx context.Context, bucket Bucket, path string) (err error) { - - if err = tc.unsetMirrorFileBackup(ctx, path, bucket.Slug()); err != nil { - log.Error(fmt.Sprintf("error unsetting mirror file as backup (path=%+v b.Slug=%+v)", path, bucket.Slug()), err) - return err - } - - if err = tc.deleteFileFromHub(ctx, bucket, path); err != nil { - log.Error(fmt.Sprintf("error backuping up file with reader (path=%+v b.Slug=%+v)", path, bucket.Slug()), err) - return err - } - - return nil -} - -// unbackup all files in a bucket -func (tc *textileClient) unbackupBucketFiles(ctx context.Context, bucket Bucket, path string) (int, error) { - var wg sync.WaitGroup - var count int - - dir, err := bucket.ListDirectory(ctx, path) - if err != nil { - return 0, err - } - - for _, item := range dir.Item.Items { - if item.Name == ".textileseed" || item.Name == ".textile" { - continue - } - - if item.IsDir { - p := strings.Join([]string{path, item.Name}, "/") - - n, err := tc.unbackupBucketFiles(ctx, bucket, p) - if err != nil { - return count, err - } - - count += n - continue - } - - wg.Add(1) - - go func(path string) { - defer wg.Done() - - if tc.isSharedFile(ctx, bucket, path) { - return - } - if err = tc.unbackupFile(ctx, bucket, path); err != nil { - return - } - - count += 1 - }(item.Path) - } - - wg.Wait() - return count, nil -} - -// dereplicate meta, key and shared with me threads -func (tc *textileClient) dereplicateThreadsToHub(ctx context.Context, bucket Bucket) (int, error) { - var count int - - userThreadIds, err := tc.userThreadIds(ctx, bucket) - if err != nil { - return 0, nil - } - - for _, dbId := range userThreadIds { - if err = tc.DereplicateThreadFromHub(ctx, &dbId); err != nil { - return 0, err - } - - count += 1 - } - - return count, nil -} - -// unbackup the bucket -func (tc *textileClient) UnbackupBucket(ctx context.Context, bucket Bucket) (count int, err error) { - - count, err = tc.unbackupBucketFiles(ctx, bucket, "") - if err != nil { - return 0, nil - } - - if _, err = tc.dereplicateThreadsToHub(ctx, bucket); err != nil { - return 0, nil - } - - return count, nil -} diff --git a/core/textile/bucket/bucket.go b/core/textile/bucket/bucket.go index b0e31cf1..3c9d074f 100644 --- a/core/textile/bucket/bucket.go +++ b/core/textile/bucket/bucket.go @@ -9,6 +9,7 @@ import ( "github.com/textileio/go-threads/core/thread" bucketsClient "github.com/textileio/textile/api/buckets/client" bucketsproto "github.com/textileio/textile/api/buckets/pb" + "github.com/textileio/textile/buckets" ) type BucketData struct { @@ -29,6 +30,44 @@ type BucketsClient interface { ListPath(ctx context.Context, key, pth string) (*bucketsproto.ListPathResponse, error) RemovePath(ctx context.Context, key, pth string, opts ...bucketsClient.Option) (path.Resolved, error) ListIpfsPath(ctx context.Context, ipfsPath path.Path) (*bucketsproto.ListIpfsPathResponse, error) + PushPathAccessRoles(ctx context.Context, key, path string, roles map[string]buckets.Role) error +} + +type BucketInterface interface { + Slug() string + Key() string + GetData() BucketData + GetContext(ctx context.Context) (context.Context, *thread.ID, error) + GetClient() BucketsClient + GetThreadID(ctx context.Context) (*thread.ID, error) + DirExists(ctx context.Context, path string) (bool, error) + FileExists(ctx context.Context, path string) (bool, error) + UploadFile( + ctx context.Context, + path string, + reader io.Reader, + ) (result path.Resolved, root path.Path, err error) + GetFile( + ctx context.Context, + path string, + w io.Writer, + ) error + CreateDirectory( + ctx context.Context, + path string, + ) (result path.Resolved, root path.Path, err error) + ListDirectory( + ctx context.Context, + path string, + ) (*DirEntries, error) + DeleteDirOrFile( + ctx context.Context, + path string, + ) (path.Resolved, error) +} + +type Notifier interface { + OnUploadFile(bucketSlug string, bucketPath string, result path.Resolved, root path.Path) } // NOTE: all write operations should use the lock for the bucket to keep consistency @@ -39,6 +78,7 @@ type Bucket struct { root *bucketsproto.Root bucketsClient BucketsClient getBucketContext GetBucketContextFn + notifier Notifier } func (b *Bucket) Slug() string { @@ -56,6 +96,7 @@ func New( root: root, bucketsClient: bucketsClient, getBucketContext: getBucketContext, + notifier: nil, } } @@ -74,15 +115,23 @@ func (b *Bucket) GetData() BucketData { } } -func (b *Bucket) getContext(ctx context.Context) (context.Context, *thread.ID, error) { +func (b *Bucket) GetContext(ctx context.Context) (context.Context, *thread.ID, error) { return b.getBucketContext(ctx, b.root.Name) } +func (b *Bucket) GetClient() BucketsClient { + return b.bucketsClient +} + func (b *Bucket) GetThreadID(ctx context.Context) (*thread.ID, error) { - _, threadID, err := b.getContext(ctx) + _, threadID, err := b.GetContext(ctx) if err != nil { return nil, err } return threadID, nil } + +func (b *Bucket) AttachNotifier(n Notifier) { + b.notifier = n +} diff --git a/core/textile/bucket/bucket_dir.go b/core/textile/bucket/bucket_dir.go index 3ba36f47..fc9443a5 100644 --- a/core/textile/bucket/bucket_dir.go +++ b/core/textile/bucket/bucket_dir.go @@ -41,7 +41,7 @@ func (b *Bucket) DirExists(ctx context.Context, path string) (bool, error) { func (b *Bucket) CreateDirectory(ctx context.Context, path string) (result path.Resolved, root path.Path, err error) { b.lock.Lock() defer b.lock.Unlock() - ctx, _, err = b.getContext(ctx) + ctx, _, err = b.GetContext(ctx) if err != nil { return nil, nil, err } @@ -55,7 +55,7 @@ func (b *Bucket) CreateDirectory(ctx context.Context, path string) (result path. func (b *Bucket) ListDirectory(ctx context.Context, path string) (*DirEntries, error) { b.lock.RLock() defer b.lock.RUnlock() - ctx, _, err := b.getContext(ctx) + ctx, _, err := b.GetContext(ctx) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func (b *Bucket) DeleteDirOrFile(ctx context.Context, path string) (path.Resolve b.lock.Lock() defer b.lock.Unlock() - ctx, _, err := b.getContext(ctx) + ctx, _, err := b.GetContext(ctx) if err != nil { return nil, err } diff --git a/core/textile/bucket/bucket_file.go b/core/textile/bucket/bucket_file.go index 4b663aaa..df092557 100644 --- a/core/textile/bucket/bucket_file.go +++ b/core/textile/bucket/bucket_file.go @@ -14,7 +14,7 @@ func (b *Bucket) FileExists(ctx context.Context, pth string) (bool, error) { b.lock.RLock() defer b.lock.RUnlock() - ctx, _, err := b.getContext(ctx) + ctx, _, err := b.GetContext(ctx) if err != nil { return false, err } @@ -44,11 +44,21 @@ func (b *Bucket) FileExists(ctx context.Context, pth string) (bool, error) { func (b *Bucket) UploadFile(ctx context.Context, path string, reader io.Reader) (result path.Resolved, root path.Path, err error) { b.lock.Lock() defer b.lock.Unlock() - ctx, _, err = b.getContext(ctx) + ctx, _, err = b.GetContext(ctx) if err != nil { return nil, nil, err } - return b.bucketsClient.PushPath(ctx, b.Key(), path, reader) + + result, root, err = b.bucketsClient.PushPath(ctx, b.Key(), path, reader) + if err != nil { + return nil, nil, err + } + + if b.notifier != nil { + b.notifier.OnUploadFile(b.Slug(), path, result, root) + } + + return result, root, nil } // GetFile pulls path from bucket writing it to writer if it's a file. @@ -56,7 +66,7 @@ func (b *Bucket) GetFile(ctx context.Context, path string, w io.Writer) error { b.lock.RLock() defer b.lock.RUnlock() - ctx, _, err := b.getContext(ctx) + ctx, _, err := b.GetContext(ctx) if err != nil { return err } diff --git a/core/textile/bucket/crypto/crypto.go b/core/textile/bucket/crypto/crypto.go index 32e0a4d8..744d0c53 100644 --- a/core/textile/bucket/crypto/crypto.go +++ b/core/textile/bucket/crypto/crypto.go @@ -7,8 +7,6 @@ import ( "io" "io/ioutil" "strings" - - "github.com/FleekHQ/space-daemon/log" ) func parseKeys(key []byte) (aesKey, iv, hmacKey []byte, err error) { @@ -87,7 +85,6 @@ func EncryptPathItems(key []byte, path string, plainReader io.Reader) (string, i // NOTE: key must be a 64 byte long key func DecryptPathItems(key []byte, path string, encryptedReader io.Reader) (string, io.ReadCloser, error) { // decrypt path - log.Debug("Decrypting Path Items", "path:"+path) aesKey, iv, hmacKey, err := parseKeys(key) if err != nil { return "", nil, err diff --git a/core/textile/bucket_factory.go b/core/textile/bucket_factory.go index e9bc90e3..bfbf162d 100644 --- a/core/textile/bucket_factory.go +++ b/core/textile/bucket_factory.go @@ -19,6 +19,7 @@ import ( "github.com/textileio/go-threads/db" bc "github.com/textileio/textile/api/buckets/client" buckets_pb "github.com/textileio/textile/api/buckets/pb" + "github.com/textileio/textile/cmd" tdb "github.com/textileio/textile/threaddb" ) @@ -69,6 +70,30 @@ func (tc *textileClient) getBucket(ctx context.Context, slug string, remoteFile ), ) + // Attach a notifier if the bucket is local + // So that local ops can be synced to the remote node + if remoteFile == nil && tc.notifier != nil { + b.AttachNotifier(tc.notifier) + } + + return b, nil +} + +func (tc *textileClient) getBucketForMirror(ctx context.Context, slug string) (Bucket, error) { + root, getContextFn, newSlug, err := tc.getBucketRootForMirror(ctx, slug) + if err != nil { + return nil, err + } + + b := bucket.New( + root, + getContextFn, + NewSecureBucketsClient( + tc.hb, + newSlug, + ), + ) + return b, nil } @@ -77,8 +102,6 @@ func (tc *textileClient) GetDefaultBucket(ctx context.Context) (Bucket, error) { } func (tc *textileClient) getBucketContext(ctx context.Context, sDbID string, bucketSlug string, ishub bool, enckey []byte) (context.Context, *thread.ID, error) { - log.Debug("getBucketContext: Getting bucket context with dbid:" + sDbID) - dbID, err := utils.ParseDbIDFromString(sDbID) if err != nil { log.Error("Error casting thread id", err) @@ -97,9 +120,6 @@ func (tc *textileClient) getBucketContext(ctx context.Context, sDbID string, buc // Returns a context that works for accessing a bucket func (tc *textileClient) getOrCreateBucketContext(ctx context.Context, bucketSlug string) (context.Context, *thread.ID, error) { - log.Debug("getOrCreateBucketContext: Getting bucket context") - - log.Debug("getOrCreateBucketContext: Fetching thread id from meta store") m := tc.GetModel() bucketSchema, notFoundErr := m.FindBucket(ctx, bucketSlug) @@ -133,7 +153,6 @@ func (tc *textileClient) getOrCreateBucketContext(ctx context.Context, bucketSlu if err != nil { return nil, nil, err } - log.Debug("getOrCreateBucketContext: Returning bucket context") return bucketCtx, &dbID, err } @@ -201,6 +220,39 @@ func (tc *textileClient) getBucketRootFromReceivedFile(ctx context.Context, file return nil, nil, NotFound(receivedFile.Bucket) } +func (tc *textileClient) getBucketRootForMirror(ctx context.Context, slug string) (*buckets_pb.Root, bucket.GetBucketContextFn, string, error) { + bucket, err := tc.GetModel().FindBucket(ctx, slug) + if err != nil { + return nil, nil, "", err + } + + getCtxFn := func(ctx context.Context, slug string) (context.Context, *thread.ID, error) { + return tc.getBucketContext(ctx, bucket.RemoteDbID, bucket.RemoteBucketSlug, true, bucket.EncryptionKey) + } + + remoteCtx, _, err := getCtxFn(ctx, bucket.RemoteBucketSlug) + if err != nil { + return nil, nil, "", err + } + + sbs := NewSecureBucketsClient( + tc.hb, + bucket.RemoteBucketSlug, + ) + + b, err := sbs.ListPath(remoteCtx, bucket.RemoteBucketKey, "") + + if err != nil { + return nil, nil, "", err + } + + if b != nil { + return b.GetRoot(), getCtxFn, bucket.RemoteBucketSlug, nil + } + + return nil, nil, "", NotFound(bucket.RemoteBucketSlug) +} + func (tc *textileClient) getBucketRootFromSlug(ctx context.Context, slug string) (context.Context, *buckets_pb.Root, error) { ctx, _, err := tc.getOrCreateBucketContext(ctx, slug) if err != nil { @@ -259,17 +311,7 @@ func (tc *textileClient) createBucket(ctx context.Context, bucketSlug string) (B return nil, err } - mirrorSchema, err := tc.createMirrorBucket(ctx, *schema) - if err != nil { - return nil, err - } - - if mirrorSchema != nil { - _, err = m.CreateMirrorBucket(ctx, bucketSlug, mirrorSchema) - if err != nil { - return nil, err - } - } + tc.sync.NotifyBucketCreated(schema.Slug, schema.EncryptionKey) newB := bucket.New( b.Root, @@ -293,7 +335,12 @@ func (tc *textileClient) ShareBucket(ctx context.Context, bucketSlug string) (*d b, err := tc.threads.GetDBInfo(ctx, *dbID) // replicate to the hub - if err := tc.ReplicateThreadToHub(ctx, dbID); err != nil { + hubma := tc.cfg.GetString(config.TextileHubMa, "") + if hubma == "" { + return nil, fmt.Errorf("no textile hub set") + } + + if _, err := tc.netc.AddReplicator(ctx, *dbID, cmd.AddrFromStr(hubma)); err != nil { log.Error("Unable to replicate on the hub: ", err) // proceeding still because local/public IP // addresses could be used to join thread @@ -374,6 +421,12 @@ func (tc *textileClient) ToggleBucketBackup(ctx context.Context, bucketSlug stri return false, err } + if bucketSchema.Backup { + tc.sync.NotifyBucketBackupOn(bucketSlug) + } else { + tc.sync.NotifyBucketBackupOff(bucketSlug) + } + return bucketSchema.Backup, nil } diff --git a/core/textile/client.go b/core/textile/client.go index 21cae5e9..2c6a614b 100644 --- a/core/textile/client.go +++ b/core/textile/client.go @@ -13,8 +13,11 @@ import ( "github.com/FleekHQ/space-daemon/core/keychain" db "github.com/FleekHQ/space-daemon/core/store" + "github.com/FleekHQ/space-daemon/core/textile/bucket" "github.com/FleekHQ/space-daemon/core/textile/hub" "github.com/FleekHQ/space-daemon/core/textile/model" + "github.com/FleekHQ/space-daemon/core/textile/notifier" + synchronizer "github.com/FleekHQ/space-daemon/core/textile/sync" "github.com/FleekHQ/space-daemon/core/util/address" "github.com/FleekHQ/space-daemon/log" threadsClient "github.com/textileio/go-threads/api/client" @@ -55,6 +58,8 @@ type textileClient struct { hubAuth hub.HubAuth mbNotifier GrpcMailboxNotifier failedHealthchecks int + sync synchronizer.Synchronizer + notifier bucket.Notifier } // Creates a new Textile Client @@ -81,6 +86,8 @@ func NewClient(store db.Store, kc keychain.Keychain, hubAuth hub.HubAuth, uc Use hubAuth: hubAuth, mbNotifier: nil, failedHealthchecks: 0, + sync: nil, + notifier: nil, } } @@ -122,6 +129,28 @@ func (tc *textileClient) getHubCtx(ctx context.Context) (context.Context, error) return ctx, nil } +func (tc *textileClient) initializeSync(ctx context.Context) { + getLocalBucketFn := func(ctx context.Context, slug string) (bucket.BucketInterface, error) { + return tc.getBucket(ctx, slug, nil) + } + + getMirrorBucketFn := func(ctx context.Context, slug string) (bucket.BucketInterface, error) { + return tc.getBucketForMirror(ctx, slug) + } + + tc.sync = synchronizer.New( + tc.store, tc.GetModel(), tc.kc, tc.hubAuth, tc.hb, tc.ht, tc.netc, tc.cfg, getMirrorBucketFn, getLocalBucketFn, tc.getBucketContext, + ) + + tc.notifier = notifier.New(tc.sync) + + if err := tc.sync.RestoreQueue(); err != nil { + log.Warn("Could not restore Textile synchronizer queue. Queue will start fresh.") + } + + tc.sync.Start(ctx) +} + // Starts the Textile Client func (tc *textileClient) start(ctx context.Context, cfg config.Config) error { tc.cfg = cfg @@ -164,6 +193,8 @@ func (tc *textileClient) start(ctx context.Context, cfg config.Config) error { tc.ht = getHubThreadsClient(tc.cfg.GetString(config.TextileHubTarget, "")) tc.hb = getHubBucketClient(tc.cfg.GetString(config.TextileHubTarget, "")) + tc.initializeSync(ctx) + tc.isRunning = true tc.healthcheck(ctx) @@ -205,9 +236,6 @@ func (tc *textileClient) start(ctx context.Context, cfg config.Config) error { } } -// notreturning error rn and this helper does -// the logging if connection to hub fails, and -// we continue with startup func (tc *textileClient) checkHubConnection(ctx context.Context) error { // Get the public key to see if we have any // Reject right away if not @@ -376,6 +404,8 @@ func (tc *textileClient) Shutdown() error { return err } + tc.sync.Shutdown() + tc.bucketsClient = nil tc.threads = nil @@ -403,18 +433,13 @@ func (tc *textileClient) GetFailedHealthchecks() int { func (tc *textileClient) healthcheck(ctx context.Context) { log.Debug("Textile Client healthcheck... Start.") - // NOTE: since we check for the hub connection before the initialization - // this means that a hub connection is required to init for now. Leaving - // it like this for release and then we can have a better online vs offline - // state management work started asap in parallel (i.e., what happens if - // they are offline during init? and then what happens if they come back - // online post init and vice versa). - err := tc.checkHubConnection(ctx) - - if err == nil && tc.isInitialized == false { + if tc.isInitialized == false { + // NOTE: Initialize does not need a hub connection as remote syncing is done in a background process tc.initialize(ctx) } + tc.checkHubConnection(ctx) + switch { case tc.isInitialized == false: log.Debug("Textile Client healthcheck... Not initialized yet.") diff --git a/core/textile/mirror.go b/core/textile/mirror.go index c4615f5a..f38c5a81 100644 --- a/core/textile/mirror.go +++ b/core/textile/mirror.go @@ -3,18 +3,8 @@ package textile import ( "context" "fmt" - "io" - "github.com/FleekHQ/space-daemon/config" - "github.com/FleekHQ/space-daemon/core/space/domain" - "github.com/FleekHQ/space-daemon/core/textile/model" - "github.com/FleekHQ/space-daemon/core/textile/utils" "github.com/FleekHQ/space-daemon/log" - "github.com/ipfs/interface-go-ipfs-core/path" - "github.com/textileio/go-threads/core/thread" - "github.com/textileio/go-threads/db" - bc "github.com/textileio/textile/api/buckets/client" - "github.com/textileio/textile/buckets" ) const mirrorThreadKeyName = "mirrorV1" @@ -29,58 +19,6 @@ func (tc *textileClient) IsMirrorFile(ctx context.Context, path, bucketSlug stri } // set mirror file as backup -func (tc *textileClient) setMirrorFileBackup(ctx context.Context, path, bucketSlug string) error { - mf, err := tc.GetModel().FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug) - if err != nil { - return err - } - if mf != nil { - // update - mf.Backup = true - - _, err = tc.GetModel().UpdateMirrorFile(ctx, mf) - if err != nil { - return err - } - } else { - // create - mf := &domain.MirrorFile{ - Path: path, - BucketSlug: bucketSlug, - Backup: true, - Shared: false, - } - - _, err := tc.GetModel().CreateMirrorFile(ctx, mf) - if err != nil { - return err - } - } - - return nil -} - -// unset mirror file as backup -func (tc *textileClient) unsetMirrorFileBackup(ctx context.Context, path, bucketSlug string) error { - mf, err := tc.GetModel().FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug) - if err != nil { - return err - } - if mf == nil { - log.Warn(fmt.Sprintf("mirror file (path=%+v bucketSlug=%+v) does not exist", path, bucketSlug)) - return nil - } - - // do not delete the instance because it might be shared - mf.Backup = false - - _, err = tc.GetModel().UpdateMirrorFile(ctx, mf) - if err != nil { - return err - } - - return nil -} // return true if mirror file is a backup func (tc *textileClient) isMirrorBackupFile(ctx context.Context, path, bucketSlug string) bool { @@ -96,127 +34,3 @@ func (tc *textileClient) isMirrorBackupFile(ctx context.Context, path, bucketSlu return mf.Backup == true } - -func (tc *textileClient) addCurrentUserAsFileOwner(ctx context.Context, bucketsClient *SecureBucketClient, key, path string) error { - roles := make(map[string]buckets.Role) - pk, err := tc.kc.GetStoredPublicKey() - if err != nil { - return err - } - tpk := thread.NewLibp2pPubKey(pk) - roles[tpk.String()] = buckets.Admin - - return bucketsClient.PushPathAccessRoles(ctx, key, path, roles) -} - -func (tc *textileClient) UploadFileToHub(ctx context.Context, b Bucket, path string, reader io.Reader) (result path.Resolved, root path.Path, err error) { - // XXX: locking? - - bucket, err := tc.GetModel().FindBucket(ctx, b.Slug()) - if err != nil { - return nil, nil, err - } - - hubCtx, _, err := tc.getBucketContext(ctx, bucket.RemoteDbID, b.Slug(), true, bucket.EncryptionKey) - if err != nil { - return nil, nil, err - } - - bucketsClient := NewSecureBucketsClient( - tc.hb, - b.Slug(), - ) - - result, root, err = bucketsClient.PushPath(hubCtx, bucket.RemoteBucketKey, path, reader) - if err != nil { - return nil, nil, err - } - - err = tc.addCurrentUserAsFileOwner(hubCtx, bucketsClient, bucket.RemoteBucketKey, path) - if err != nil { - // not returning since we dont want to halt the whole process - // also acl will still work since they are the owner - // of the thread so this is more for showing members view - log.Error("Unable to push path access roles for owner", err) - } - - return result, root, nil -} - -// XXX: public in the interface as the reverse of UploadFileToHub? -func (tc *textileClient) deleteFileFromHub(ctx context.Context, b Bucket, path string) (err error) { - // XXX: locking? - - bucket, err := tc.GetModel().FindBucket(ctx, b.Slug()) - if err != nil { - return err - } - - hubCtx, _, err := tc.getBucketContext(ctx, bucket.RemoteDbID, b.Slug(), true, bucket.EncryptionKey) - if err != nil { - return err - } - - bucketsClient := NewSecureBucketsClient( - tc.hb, - b.Slug(), - ) - - _, err = bucketsClient.RemovePath(hubCtx, bucket.RemoteBucketKey, path) - if err != nil { - return err - } - - return nil -} - -// Creates a mirror bucket. -func (tc *textileClient) createMirrorBucket(ctx context.Context, schema model.BucketSchema) (*model.MirrorBucketSchema, error) { - log.Debug("Creating a new mirror bucket with slug " + defaultPersonalMirrorBucketSlug) - dbID, err := tc.createMirrorThread(ctx) - if err != nil { - return nil, err - } - hubCtx, _, err := tc.getBucketContext(ctx, utils.CastDbIDToString(*dbID), defaultPersonalMirrorBucketSlug, true, schema.EncryptionKey) - if err != nil { - return nil, err - } - - // create mirror bucket - // TODO: use bucketname + _mirror to support any local buckets not just personal - b, err := tc.hb.Create(hubCtx, bc.WithName(defaultPersonalMirrorBucketSlug), bc.WithPrivate(true)) - if err != nil { - return nil, err - } - - return &model.MirrorBucketSchema{ - RemoteDbID: utils.CastDbIDToString(*dbID), - RemoteBucketKey: b.Root.Key, - HubAddr: tc.cfg.GetString(config.TextileHubTarget, ""), - }, nil -} - -// Creates a remote hub thread for the mirror bucket -func (tc *textileClient) createMirrorThread(ctx context.Context) (*thread.ID, error) { - log.Debug("createMirrorThread: Generating a new threadID ...") - var err error - ctx, err = tc.getHubCtx(ctx) - if err != nil { - return nil, err - } - - dbID := thread.NewIDV1(thread.Raw, 32) - - managedKey, err := tc.kc.GetManagedThreadKey(mirrorThreadKeyName) - if err != nil { - log.Error("error getting managed thread key", err) - return nil, err - } - - log.Debug("createMirrorThread: Creating Thread DB for bucket at db " + dbID.String()) - if err := tc.ht.NewDB(ctx, dbID, db.WithNewManagedThreadKey(managedKey)); err != nil { - return nil, err - } - log.Debug("createMirrorThread: Thread DB Created") - return &dbID, nil -} diff --git a/core/textile/model/buckets.go b/core/textile/model/buckets.go index 3849fde9..a9c21fd3 100644 --- a/core/textile/model/buckets.go +++ b/core/textile/model/buckets.go @@ -54,9 +54,10 @@ func (m *model) CreateBucket(ctx context.Context, bucketSlug, dbID string) (*Buc Backup: true, EncryptionKey: bucketEncryptionKey, MirrorBucketSchema: &MirrorBucketSchema{ - HubAddr: "", - RemoteBucketKey: "", - RemoteDbID: "", + HubAddr: "", + RemoteBucketKey: "", + RemoteDbID: "", + RemoteBucketSlug: "", }, } @@ -76,9 +77,10 @@ func (m *model) CreateBucket(ctx context.Context, bucketSlug, dbID string) (*Buc DbID: newInstance.DbID, Backup: newInstance.Backup, MirrorBucketSchema: &MirrorBucketSchema{ - HubAddr: newInstance.MirrorBucketSchema.HubAddr, - RemoteBucketKey: newInstance.MirrorBucketSchema.RemoteBucketKey, - RemoteDbID: newInstance.MirrorBucketSchema.RemoteDbID, + HubAddr: newInstance.MirrorBucketSchema.HubAddr, + RemoteBucketKey: newInstance.MirrorBucketSchema.RemoteBucketKey, + RemoteDbID: newInstance.MirrorBucketSchema.RemoteDbID, + RemoteBucketSlug: newInstance.MirrorBucketSchema.RemoteBucketSlug, }, }, nil } @@ -138,7 +140,7 @@ func (m *model) FindBucket(ctx context.Context, bucketSlug string) (*BucketSchem if len(buckets) == 0 { return nil, errBucketNotFound } - log.Debug("Model.FindBucket: returning bucket with dbid " + buckets[0].DbID) + return buckets[0], nil } @@ -162,19 +164,15 @@ func (m *model) initBucketModel(ctx context.Context) (context.Context, *thread.I return nil, nil, err } - if err = m.threads.NewDB(metaCtx, *dbID); err != nil { - log.Debug("initBucketModel: db already exists") - } - if err := m.threads.NewCollection(metaCtx, *dbID, db.CollectionConfig{ + m.threads.NewDB(metaCtx, *dbID) + m.threads.NewCollection(metaCtx, *dbID, db.CollectionConfig{ Name: bucketModelName, Schema: util.SchemaFromInstance(&BucketSchema{}, false), Indexes: []db.Index{{ Path: "slug", Unique: true, }}, - }); err != nil { - log.Debug("initBucketModel: collection already exists") - } + }) return metaCtx, dbID, nil } diff --git a/core/textile/model/mirror_file.go b/core/textile/model/mirror_file.go index 210c0223..092f7585 100644 --- a/core/textile/model/mirror_file.go +++ b/core/textile/model/mirror_file.go @@ -26,9 +26,10 @@ type MirrorFileSchema struct { } type MirrorBucketSchema struct { - RemoteDbID string `json:"remoteDbId"` - RemoteBucketKey string `json:"remoteBucketKey"` - HubAddr string `json:"HubAddr"` + RemoteDbID string `json:"remoteDbId"` + RemoteBucketKey string `json:"remoteBucketKey"` + HubAddr string `json:"HubAddr"` + RemoteBucketSlug string `json:"remoteBucketSlug"` } const mirrorFileModelName = "MirrorFile" @@ -50,6 +51,7 @@ func (m *model) CreateMirrorBucket(ctx context.Context, bucketSlug string, mirro bucket.RemoteDbID = mirrorBucket.RemoteDbID bucket.HubAddr = mirrorBucket.HubAddr bucket.RemoteBucketKey = mirrorBucket.RemoteBucketKey + bucket.RemoteBucketSlug = mirrorBucket.RemoteBucketSlug instances := client.Instances{bucket} @@ -151,7 +153,6 @@ func (m *model) CreateMirrorFile(ctx context.Context, mirrorFile *domain.MirrorF if err != nil { return nil, err } - log.Debug("stored mirror file with dbid " + newInstance.DbID) id := res[0] return &MirrorFileSchema{ @@ -197,19 +198,16 @@ func (m *model) initMirrorFileModel(ctx context.Context) (context.Context, *thre return nil, nil, err } - if err = m.threads.NewDB(metaCtx, *dbID); err != nil { - log.Debug("initMirrorFileModel: db already exists") - } - if err := m.threads.NewCollection(metaCtx, *dbID, db.CollectionConfig{ + m.threads.NewDB(metaCtx, *dbID) + + m.threads.NewCollection(metaCtx, *dbID, db.CollectionConfig{ Name: mirrorFileModelName, Schema: util.SchemaFromInstance(&MirrorFileSchema{}, false), Indexes: []db.Index{{ Path: "path", Unique: true, // TODO: multicolumn index }}, - }); err != nil { - log.Debug("initMirrorFileModel: collection already exists") - } + }) return metaCtx, dbID, nil } diff --git a/core/textile/notifier/notifier.go b/core/textile/notifier/notifier.go new file mode 100644 index 00000000..539442e3 --- /dev/null +++ b/core/textile/notifier/notifier.go @@ -0,0 +1,20 @@ +package notifier + +import ( + "github.com/FleekHQ/space-daemon/core/textile/sync" + "github.com/ipfs/interface-go-ipfs-core/path" +) + +type Notifier struct { + s sync.Synchronizer +} + +func New(s sync.Synchronizer) *Notifier { + return &Notifier{ + s: s, + } +} + +func (n *Notifier) OnUploadFile(bucketSlug string, bucketPath string, result path.Resolved, root path.Path) { + n.s.NotifyItemAdded(bucketSlug, bucketPath) +} diff --git a/core/textile/secure_bucket_client.go b/core/textile/secure_bucket_client.go index 2e1bad28..e42b7ce7 100644 --- a/core/textile/secure_bucket_client.go +++ b/core/textile/secure_bucket_client.go @@ -3,6 +3,7 @@ package textile import ( "context" "errors" + "fmt" "io" "regexp" "strings" @@ -131,7 +132,6 @@ func (s *SecureBucketClient) overwriteDecryptedItem(ctx context.Context, item *b if err != nil { return err } - log.Debug("Processing Result Item", "name:"+item.Name, "path:"+item.Path) if item.Name == ".textileseed" || item.Name == ".textile" { return nil } @@ -149,7 +149,6 @@ func (s *SecureBucketClient) overwriteDecryptedItem(ctx context.Context, item *b return err } } - log.Debug("Processed Result Item", "name:"+item.Name, "path:"+item.Path) // Item size is generally (content size + hmac (64 bytes)) if item.Size >= 64 { @@ -185,7 +184,7 @@ func (s *SecureBucketClient) ListPath(ctx context.Context, key, path string) (*b err = s.overwriteDecryptedItem(ctx, item) if err != nil { // Don't error on a single file not decrypted - log.Error("Error decrypting a file", err) + log.Debug(fmt.Sprintf("Error decrypting a file: %s", err.Error())) } } @@ -193,7 +192,7 @@ func (s *SecureBucketClient) ListPath(ctx context.Context, key, path string) (*b err = s.overwriteDecryptedItem(ctx, result.Item) if err != nil { // Don't error on a single file not decrypted - log.Error("Error decrypting a file", err) + log.Debug(fmt.Sprintf("Error decrypting a file: %s", err.Error())) } return result, nil diff --git a/core/textile/sharing.go b/core/textile/sharing.go index 7a2318f8..f9ef13e2 100644 --- a/core/textile/sharing.go +++ b/core/textile/sharing.go @@ -246,7 +246,7 @@ func (tc *textileClient) GetPathAccessRoles(ctx context.Context, b Bucket, path return []domain.Member{}, nil } - log.Debug(fmt.Sprintf("PullPathAccessRoles roles=%+v", rs)) + // log.Debug(fmt.Sprintf("PullPathAccessRoles roles=%+v", rs)) members := make([]domain.Member, 0) for pubk, _ := range rs { diff --git a/core/textile/sync/mirror.go b/core/textile/sync/mirror.go new file mode 100644 index 00000000..eb4f7598 --- /dev/null +++ b/core/textile/sync/mirror.go @@ -0,0 +1,151 @@ +package sync + +import ( + "context" + "fmt" + + "github.com/FleekHQ/space-daemon/config" + "github.com/FleekHQ/space-daemon/core/space/domain" + "github.com/FleekHQ/space-daemon/core/textile/model" + "github.com/FleekHQ/space-daemon/core/textile/utils" + "github.com/FleekHQ/space-daemon/log" + "github.com/textileio/go-threads/core/thread" + "github.com/textileio/go-threads/db" + bucketsClient "github.com/textileio/textile/api/buckets/client" + "github.com/textileio/textile/buckets" +) + +const mirrorThreadKeyName = "mirrorV1" + +func (s *synchronizer) setMirrorFileBackup(ctx context.Context, path, bucketSlug string) error { + mf, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug) + if err != nil { + return err + } + if mf != nil { + // update + mf.Backup = true + + _, err = s.model.UpdateMirrorFile(ctx, mf) + if err != nil { + return err + } + } else { + // create + mf := &domain.MirrorFile{ + Path: path, + BucketSlug: bucketSlug, + Backup: true, + Shared: false, + } + + _, err := s.model.CreateMirrorFile(ctx, mf) + if err != nil { + return err + } + } + + return nil +} + +// unset mirror file as backup +func (s *synchronizer) unsetMirrorFileBackup(ctx context.Context, path, bucketSlug string) error { + mf, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucketSlug) + if err != nil { + return err + } + if mf == nil { + log.Warn(fmt.Sprintf("mirror file (path=%+v bucketSlug=%+v) does not exist", path, bucketSlug)) + return nil + } + + // do not delete the instance because it might be shared + mf.Backup = false + + _, err = s.model.UpdateMirrorFile(ctx, mf) + if err != nil { + return err + } + + return nil +} + +func (s *synchronizer) addCurrentUserAsFileOwner(ctx context.Context, bucket, path string) error { + bucketModel, err := s.model.FindBucket(ctx, bucket) + if err != nil { + return err + } + + roles := make(map[string]buckets.Role) + pk, err := s.kc.GetStoredPublicKey() + if err != nil { + return err + } + tpk := thread.NewLibp2pPubKey(pk) + roles[tpk.String()] = buckets.Admin + + mirror, err := s.getMirrorBucket(ctx, bucket) + if err != nil { + return err + } + + bucketsClient := mirror.GetClient() + bucketCtx, _, err := s.getBucketCtx(ctx, bucketModel.RemoteDbID, bucketModel.RemoteBucketSlug, true, bucketModel.EncryptionKey) + if err != nil { + return err + } + + return bucketsClient.PushPathAccessRoles(bucketCtx, mirror.GetData().Key, path, roles) +} + +// Creates a mirror bucket. +func (s *synchronizer) createMirrorBucket(ctx context.Context, slug string, enckey []byte) (*model.MirrorBucketSchema, error) { + newSlug := slug + "_mirror" + log.Debug("Creating a new mirror bucket with slug " + newSlug) + dbID, err := s.createMirrorThread(ctx, newSlug) + if err != nil { + return nil, err + } + + hubCtx, _, err := s.getBucketCtx(ctx, utils.CastDbIDToString(*dbID), newSlug, true, enckey) + if err != nil { + return nil, err + } + + b, err := s.hubBuckets.Create(hubCtx, bucketsClient.WithName(newSlug), bucketsClient.WithPrivate(true)) + if err != nil { + return nil, err + } + + return &model.MirrorBucketSchema{ + RemoteDbID: utils.CastDbIDToString(*dbID), + RemoteBucketKey: b.Root.Key, + RemoteBucketSlug: newSlug, + HubAddr: s.cfg.GetString(config.TextileHubTarget, ""), + }, nil +} + +// Creates a remote hub thread for the mirror bucket +func (s *synchronizer) createMirrorThread(ctx context.Context, slug string) (*thread.ID, error) { + log.Debug("createMirrorThread: Generating a new threadID ...") + var err error + ctx, err = s.hubAuth.GetHubContext(ctx) + if err != nil { + return nil, err + } + + dbID := thread.NewIDV1(thread.Raw, 32) + + managedKey, err := s.kc.GetManagedThreadKey(mirrorThreadKeyName + "_" + slug) + if err != nil { + log.Error("error getting managed thread key", err) + return nil, err + } + + log.Debug("createMirrorThread: Creating Thread DB for bucket at db " + dbID.String()) + if err := s.hubThreads.NewDB(ctx, dbID, db.WithNewManagedThreadKey(managedKey)); err != nil { + return nil, err + } + log.Debug("createMirrorThread: Thread DB Created") + return &dbID, nil +} diff --git a/core/textile/sync/pinning.go b/core/textile/sync/pinning.go new file mode 100644 index 00000000..1bb5b660 --- /dev/null +++ b/core/textile/sync/pinning.go @@ -0,0 +1,139 @@ +package sync + +import ( + "context" + "io" + "strings" + + "github.com/FleekHQ/space-daemon/log" +) + +func (s *synchronizer) uploadFileToRemote(ctx context.Context, bucket, path string) error { + mirror, err := s.getMirrorBucket(ctx, bucket) + if err != nil { + return err + } + + localBucket, err := s.getBucket(ctx, bucket) + if err != nil { + return err + } + + pipeReader, pipeWriter := io.Pipe() + defer pipeReader.Close() + + errc := make(chan error, 1) + // go routine for piping + go func() { + defer close(errc) + defer pipeWriter.Close() + + if err := localBucket.GetFile(ctx, path, pipeWriter); err != nil { + errc <- err + } + + errc <- nil + }() + + _, _, err = mirror.UploadFile(ctx, path, pipeReader) + if err != nil { + return err + } + + if err := <-errc; err != nil { + return err + } + + if err := s.addCurrentUserAsFileOwner(ctx, bucket, path); err != nil { + // not returning since we dont want to halt the whole process + // also acl will still work since they are the owner + // of the thread so this is more for showing members view + log.Error("Unable to push path access roles for owner", err) + } + + return nil +} + +// backup all files in a bucket +func (s *synchronizer) uploadAllFilesInPath(ctx context.Context, bucket, path string) error { + localBucket, err := s.getBucket(ctx, bucket) + if err != nil { + return err + } + + dir, err := localBucket.ListDirectory(ctx, path) + if err != nil { + return err + } + + for _, item := range dir.Item.Items { + if item.Name == ".textileseed" || item.Name == ".textile" { + continue + } + + if item.IsDir { + p := strings.Join([]string{path, item.Name}, "/") + + err := s.uploadAllFilesInPath(ctx, bucket, p) + if err != nil { + return err + } + + continue + } + + // If the current item is a file, we add it to the queue so that it both gets pinned and synced + s.NotifyItemAdded(bucket, path) + + } + + return nil +} + +func (s *synchronizer) deleteFileFromRemote(ctx context.Context, bucket, path string) (err error) { + mirrorBucket, err := s.getMirrorBucket(ctx, bucket) + if err != nil { + return err + } + + _, err = mirrorBucket.DeleteDirOrFile(ctx, path) + if err != nil { + return err + } + + return nil +} + +func (s *synchronizer) deleteAllFilesInPath(ctx context.Context, bucket, path string) error { + localBucket, err := s.getBucket(ctx, bucket) + if err != nil { + return err + } + + dir, err := localBucket.ListDirectory(ctx, path) + if err != nil { + return err + } + + for _, item := range dir.Item.Items { + if item.Name == ".textileseed" || item.Name == ".textile" { + continue + } + + if item.IsDir { + p := strings.Join([]string{path, item.Name}, "/") + + err := s.deleteAllFilesInPath(ctx, bucket, p) + if err != nil { + return err + } + + continue + } + + // If the current item is a file, we add it to the queue so that it both gets pinned and synced + s.NotifyItemRemoved(bucket, path) + } + + return nil +} diff --git a/core/textile/sync/queue.go b/core/textile/sync/queue.go new file mode 100644 index 00000000..5d8c1b96 --- /dev/null +++ b/core/textile/sync/queue.go @@ -0,0 +1,130 @@ +package sync + +import ( + "container/list" + "encoding/json" + "fmt" +) + +const QueueStoreKey = "TextileSyncTaskQueue" + +type marshalledQueue struct { + QueueAsSlice []Task `json:"queueAsSlice"` + FileQueueAsSlice []Task `json:"fileQueueAsSlice"` +} + +func (s *synchronizer) enqueueTask(task *Task, queue *list.List) { + if s.isTaskEnqueued(task) == false { + queue.PushBack(task) + s.queueHashMap[task.ID] = task + } +} + +func (s *synchronizer) dequeueTask(queue *list.List) *Task { + queueItem := queue.Front() + s.taskQueue.Remove(queueItem) + + task := queueItem.Value.(*Task) + delete(s.queueHashMap, task.ID) + + return task +} + +func (s *synchronizer) storeQueue() error { + // Store main queue + queueAsSlice := []Task{} + currEl := s.taskQueue.Front() + + for currEl != nil { + queueAsSlice = append(queueAsSlice, *currEl.Value.(*Task)) + currEl = currEl.Next() + } + + // Store file pinning queue + fileQueueAsSlice := []Task{} + currEl = s.filePinningQueue.Front() + + for currEl != nil { + fileQueueAsSlice = append(fileQueueAsSlice, *currEl.Value.(*Task)) + currEl = currEl.Next() + } + + objToMarshal := &marshalledQueue{ + QueueAsSlice: queueAsSlice, + FileQueueAsSlice: fileQueueAsSlice, + } + + marshalled, err := json.Marshal(objToMarshal) + if err != nil { + return err + } + + err = s.st.Set([]byte(QueueStoreKey), marshalled) + if err != nil { + return err + } + + return nil +} + +func (s *synchronizer) restoreQueue() error { + queueMutex1 := s.queueMutexMap[s.taskQueue] + queueMutex2 := s.queueMutexMap[s.filePinningQueue] + queueMutex1.Lock() + queueMutex2.Lock() + defer queueMutex1.Unlock() + defer queueMutex2.Unlock() + + data, err := s.st.Get([]byte(QueueStoreKey)) + if err != nil { + return err + } + + queue := &marshalledQueue{} + err = json.Unmarshal(data, queue) + if err != nil { + return err + } + + for _, el := range queue.QueueAsSlice { + s.enqueueTask(&el, s.taskQueue) + } + + for _, el := range queue.FileQueueAsSlice { + s.enqueueTask(&el, s.filePinningQueue) + } + + return nil +} + +func (s *synchronizer) isTaskEnqueued(task *Task) bool { + if s.queueHashMap[task.ID] != nil { + return true + } + + return false +} + +func (s *synchronizer) queueString(queue *list.List) string { + queueName := "buckets" + if queue == s.filePinningQueue { + queueName = "file pinning" + } + + failed, queued, pending := 0, 0, 0 + + for curr := queue.Front(); curr != nil; curr = curr.Next() { + task := curr.Value.(*Task) + + switch task.State { + case taskPending: + pending++ + case taskFailed: + failed++ + case taskQueued: + queued++ + } + } + + return fmt.Sprintf("Textile sync [%s]: Total: %d, Queued: %d, Pending: %d, Failed: %d", queueName, queue.Len(), queued, pending, failed) +} diff --git a/core/textile/sync/sync.go b/core/textile/sync/sync.go new file mode 100644 index 00000000..8d3d6055 --- /dev/null +++ b/core/textile/sync/sync.go @@ -0,0 +1,17 @@ +package sync + +import ( + "context" +) + +type Synchronizer interface { + NotifyItemAdded(bucket, path string) + NotifyItemRemoved(bucket, path string) + NotifyBucketCreated(bucket string, enckey []byte) + NotifyBucketBackupOn(bucket string) + NotifyBucketBackupOff(bucket string) + Start(ctx context.Context) + RestoreQueue() error + Shutdown() + String() string +} diff --git a/core/textile/sync/sync_test.go b/core/textile/sync/sync_test.go new file mode 100644 index 00000000..82963529 --- /dev/null +++ b/core/textile/sync/sync_test.go @@ -0,0 +1,128 @@ +package sync_test + +import ( + "context" + "errors" + sy "sync" + "testing" + + "github.com/FleekHQ/space-daemon/core/textile" + "github.com/FleekHQ/space-daemon/core/textile/bucket" + "github.com/FleekHQ/space-daemon/core/textile/sync" + "github.com/FleekHQ/space-daemon/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/textileio/go-threads/core/thread" +) + +var ( + mockStore *mocks.Store + mockClient *mocks.Client + mockModel *mocks.Model + mockKeychain *mocks.Keychain + mockHubAuth *mocks.HubAuth + mockCfg *mocks.Config + mockRemoteFile = &textile.GetBucketForRemoteFileInput{ + Bucket: "", + DbID: "", + Path: "", + } +) + +func initSync(t *testing.T) sync.Synchronizer { + mockStore = new(mocks.Store) + mockModel = new(mocks.Model) + mockKeychain = new(mocks.Keychain) + mockHubAuth = new(mocks.HubAuth) + mockCfg = new(mocks.Config) + mockClient = new(mocks.Client) + + mockStore.On("IsOpen").Return(true) + + getLocalBucketFn := func(ctx context.Context, slug string) (bucket.BucketInterface, error) { + return mockClient.GetBucket(ctx, slug, nil) + } + + getMirrorBucketFn := func(ctx context.Context, slug string) (bucket.BucketInterface, error) { + return mockClient.GetBucket(ctx, slug, mockRemoteFile) + } + + getBucketCtxFn := func(ctx context.Context, sDbID string, bucketSlug string, ishub bool, enckey []byte) (context.Context, *thread.ID, error) { + return ctx, nil, nil + } + + s := sync.New(mockStore, mockModel, mockKeychain, mockHubAuth, nil, nil, nil, mockCfg, getMirrorBucketFn, getLocalBucketFn, getBucketCtxFn) + + return s +} + +var mutex = &sy.Mutex{} + +func TestSync_ProcessTask(t *testing.T) { + mutex.Lock() + defer mutex.Unlock() + + s := initSync(t) + ctx := context.Background() + + s.NotifyItemAdded("Bucket", "path") + + // Makes the processAddItem and processPinFilefail right away + mockModel.On("FindBucket", mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + + mockStore.On("Set", mock.Anything, mock.Anything).Return(nil) + + s.Start(ctx) + + s.Shutdown() + + expectedState := "Textile sync [file pinning]: Total: 1, Queued: 1, Pending: 0, Failed: 0\nTextile sync [buckets]: Total: 1, Queued: 1, Pending: 0, Failed: 0\n" + + assert.Equal(t, expectedState, s.String()) + mockModel.AssertExpectations(t) + mockClient.AssertExpectations(t) +} + +func TestSync_Restore(t *testing.T) { + mutex.Lock() + defer mutex.Unlock() + + s := initSync(t) + ctx := context.Background() + + s.NotifyItemAdded("Bucket", "path") + + // Makes the processAddItem and processPinFilefail right away + mockModel.On("FindBucket", mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + + mockStore.On("Set", []byte(sync.QueueStoreKey), mock.Anything).Return(nil) + + s.Start(ctx) + + s.Shutdown() + + ogMockStore := mockStore + + s2 := initSync(t) + + // Make Store.Get return the data set previously + storeArgs := ogMockStore.Calls[0].Arguments + bytes := storeArgs.Get(1) + mockStore.On("Get", []byte(sync.QueueStoreKey)).Return(bytes, nil) + + err := s2.RestoreQueue() + + mockModel.On("FindBucket", mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + mockClient.On("GetBucket", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("some error")) + mockStore.On("Set", []byte(sync.QueueStoreKey), mock.Anything).Return(nil) + + // Note we are not calling NotifyItemAdded therefore the state must be picked from the Restore func + s2.Start(ctx) + + s2.Shutdown() + + assert.Nil(t, err) + assert.Equal(t, s.String(), s2.String()) +} diff --git a/core/textile/sync/synchronizer.go b/core/textile/sync/synchronizer.go new file mode 100644 index 00000000..bb941436 --- /dev/null +++ b/core/textile/sync/synchronizer.go @@ -0,0 +1,350 @@ +package sync + +import ( + "container/list" + "context" + "encoding/hex" + "errors" + "fmt" + "sync" + "time" + + "github.com/FleekHQ/space-daemon/config" + "github.com/FleekHQ/space-daemon/core/keychain" + "github.com/FleekHQ/space-daemon/core/store" + "github.com/FleekHQ/space-daemon/core/textile/bucket" + "github.com/FleekHQ/space-daemon/core/textile/hub" + "github.com/FleekHQ/space-daemon/core/textile/model" + "github.com/FleekHQ/space-daemon/log" + threadsClient "github.com/textileio/go-threads/api/client" + "github.com/textileio/go-threads/core/thread" + nc "github.com/textileio/go-threads/net/api/client" + bucketsClient "github.com/textileio/textile/api/buckets/client" +) + +type GetMirrorBucketFn func(ctx context.Context, slug string) (bucket.BucketInterface, error) +type GetBucketFn func(ctx context.Context, slug string) (bucket.BucketInterface, error) +type GetBucketCtxFn func(ctx context.Context, sDbID string, bucketSlug string, ishub bool, enckey []byte) (context.Context, *thread.ID, error) + +const maxParallelTasks = 16 + +type synchronizer struct { + taskQueue *list.List + filePinningQueue *list.List + queueHashMap map[string]*Task + st store.Store + model model.Model + syncNeeded chan (bool) + shuttingDownMap map[*list.List]chan (bool) + queueMutexMap map[*list.List]*sync.Mutex + getMirrorBucket GetMirrorBucketFn + getBucket GetBucketFn + getBucketCtx GetBucketCtxFn + kc keychain.Keychain + hubAuth hub.HubAuth + hubBuckets *bucketsClient.Client + hubThreads *threadsClient.Client + cfg config.Config + netc *nc.Client + queueWg *sync.WaitGroup +} + +// Creates a new Synchronizer +func New( + st store.Store, + model model.Model, + kc keychain.Keychain, + hubAuth hub.HubAuth, + hb *bucketsClient.Client, + ht *threadsClient.Client, + netc *nc.Client, + cfg config.Config, + getMirrorBucket GetMirrorBucketFn, + getBucket GetBucketFn, + getBucketCtx GetBucketCtxFn, +) *synchronizer { + taskQueue := list.New() + filePinningQueue := list.New() + + queueMutexMap := make(map[*list.List]*sync.Mutex) + queueMutexMap[taskQueue] = &sync.Mutex{} + queueMutexMap[filePinningQueue] = &sync.Mutex{} + + shuttingDownMap := make(map[*list.List]chan bool) + shuttingDownMap[taskQueue] = make(chan bool) + shuttingDownMap[filePinningQueue] = make(chan bool) + + queueWg := &sync.WaitGroup{} + + return &synchronizer{ + taskQueue: taskQueue, + filePinningQueue: filePinningQueue, + queueHashMap: make(map[string]*Task), + st: st, + model: model, + syncNeeded: make(chan bool), + shuttingDownMap: shuttingDownMap, + queueMutexMap: queueMutexMap, + getMirrorBucket: getMirrorBucket, + getBucket: getBucket, + getBucketCtx: getBucketCtx, + kc: kc, + hubAuth: hubAuth, + hubBuckets: hb, + hubThreads: ht, + cfg: cfg, + netc: netc, + queueWg: queueWg, + } +} + +// Notify Textile synchronizer that an add item operation needs to be synced +func (s *synchronizer) NotifyItemAdded(bucket, path string) { + t := newTask(addItemTask, []string{bucket, path}) + s.enqueueTask(t, s.taskQueue) + + pft := newTask(pinFileTask, []string{bucket, path}) + pft.Parallelizable = true + s.enqueueTask(pft, s.filePinningQueue) + + s.notifySyncNeeded() +} + +// Notify Textile synchronizer that a remove item operation needs to be synced +func (s *synchronizer) NotifyItemRemoved(bucket, path string) { + t := newTask(removeItemTask, []string{bucket, path}) + s.enqueueTask(t, s.taskQueue) + + uft := newTask(unpinFileTask, []string{bucket, path}) + uft.Parallelizable = true + s.enqueueTask(uft, s.filePinningQueue) + + s.notifySyncNeeded() +} + +func (s *synchronizer) NotifyBucketCreated(bucket string, enckey []byte) { + t := newTask(createBucketTask, []string{bucket, hex.EncodeToString(enckey)}) + s.enqueueTask(t, s.taskQueue) + s.notifySyncNeeded() +} + +func (s *synchronizer) NotifyBucketBackupOn(bucket string) { + t := newTask(bucketBackupOnTask, []string{bucket}) + s.enqueueTask(t, s.taskQueue) + + s.notifySyncNeeded() +} + +func (s *synchronizer) NotifyBucketBackupOff(bucket string) { + t := newTask(bucketBackupOffTask, []string{bucket}) + s.enqueueTask(t, s.taskQueue) + + s.notifySyncNeeded() +} + +func (s *synchronizer) notifySyncNeeded() { + select { + case s.syncNeeded <- true: + default: + } +} + +// Starts the synchronizer, which will constantly be checking if there are syncing tasks pending +func (s *synchronizer) Start(ctx context.Context) { + s.queueWg.Add(2) + // Sync loop + go func() { + s.startSyncLoop(ctx, s.taskQueue) + s.queueWg.Done() + }() + go func() { + s.startSyncLoop(ctx, s.filePinningQueue) + s.queueWg.Done() + }() +} + +// Restores a previously initialized queue +func (s *synchronizer) RestoreQueue() error { + if err := s.restoreQueue(); err != nil { + return err + } + + return nil +} + +func (s *synchronizer) startSyncLoop(ctx context.Context, queue *list.List) { + queueMutex := s.queueMutexMap[queue] + // Initial sync + queueMutex.Lock() + s.sync(ctx, queue) + queueMutex.Unlock() + +Loop: + for { + queueMutex.Lock() + timeAfterNextSync := 30 * time.Second + + select { + case <-time.After(timeAfterNextSync): + s.sync(ctx, queue) + + case <-s.syncNeeded: + s.sync(ctx, queue) + + // Break execution in case of shutdown + case <-ctx.Done(): + queueMutex.Unlock() + s.Shutdown() + break Loop + case <-s.shuttingDownMap[queue]: + queueMutex.Unlock() + break Loop + } + + queueMutex.Unlock() + } +} + +func (s *synchronizer) Shutdown() { + s.shuttingDownMap[s.taskQueue] <- true + s.shuttingDownMap[s.filePinningQueue] <- true + s.queueWg.Wait() + + if err := s.storeQueue(); err != nil { + log.Error("Error while storing Textile task queue state", err) + } + + close(s.shuttingDownMap[s.taskQueue]) + close(s.shuttingDownMap[s.filePinningQueue]) + close(s.syncNeeded) +} + +func (s *synchronizer) String() string { + queues := []*list.List{s.filePinningQueue, s.taskQueue} + + res := "" + for _, q := range queues { + res = res + s.queueString(q) + "\n" + } + + return res +} + +var errMaxRetriesSurpassed = errors.New("max retries surpassed") + +func (s *synchronizer) executeTask(ctx context.Context, t *Task) error { + var err error + + switch t.Type { + case addItemTask: + err = s.processAddItem(ctx, t) + case removeItemTask: + err = s.processRemoveItem(ctx, t) + case pinFileTask: + err = s.processPinFile(ctx, t) + case unpinFileTask: + err = s.processUnpinFile(ctx, t) + case createBucketTask: + err = s.processCreateBucket(ctx, t) + case bucketBackupOnTask: + err = s.processBucketBackupOn(ctx, t) + case bucketBackupOffTask: + err = s.processBucketBackupOff(ctx, t) + default: + log.Warn("Unexpected action on Textile sync, executeTask") + } + + if err != nil { + t.State = taskFailed + t.Retries++ + + // Remove from queue if it surpassed the max amount of retries + if t.MaxRetries != -1 && t.Retries > t.MaxRetries { + t.State = taskDequeued + return errMaxRetriesSurpassed + } + + // Retry task + t.State = taskQueued + } else { + t.State = taskSucceeded + } + + return err +} + +func (s *synchronizer) sync(ctx context.Context, queue *list.List) error { + queueName := "buckets" + if queue == s.filePinningQueue { + queueName = "file pinning" + } + + log.Debug(fmt.Sprintf("Textile sync [%s]: Sync start", queueName)) + log.Debug(s.queueString(queue)) + + parallelTaskCount := 0 + ptWg := sync.WaitGroup{} + + for curr := queue.Front(); curr != nil; curr = curr.Next() { + task := curr.Value.(*Task) + + if task.State != taskQueued { + // If task is already in process or finished, skip + continue + } + log.Debug(fmt.Sprintf("Textile sync [%s]: Processing task %s", queueName, task.Type)) + task.State = taskPending + + handleExecResult := func(err error) { + if err == nil { + // Task completed successfully + log.Debug(fmt.Sprintf("Textile sync [%s]: task completed succesfully", queueName)) + } else { + log.Error(fmt.Sprintf("Textile sync [%s]: task failed", queueName), err) + } + } + + if task.Parallelizable && parallelTaskCount < maxParallelTasks { + parallelTaskCount++ + ptWg.Add(1) + + go func() { + err := s.executeTask(ctx, task) + handleExecResult(err) + parallelTaskCount-- + ptWg.Done() + }() + } else { + err := s.executeTask(ctx, task) + handleExecResult(err) + + if err != nil { + // Break from the loop (avoid executing next tasks) + return err + } + } + } + + ptWg.Wait() + + // Remove successful and dequeued tasks from queue + curr := queue.Front() + for curr != nil { + task := curr.Value.(*Task) + next := curr.Next() + + switch task.State { + case taskDequeued: + queue.Remove(curr) + case taskSucceeded: + queue.Remove(curr) + default: + } + + curr = next + } + + log.Debug(fmt.Sprintf("Textile sync [%s]: Sync end", queueName)) + + return nil +} diff --git a/core/textile/sync/task-executors.go b/core/textile/sync/task-executors.go new file mode 100644 index 00000000..08e98d8a --- /dev/null +++ b/core/textile/sync/task-executors.go @@ -0,0 +1,146 @@ +package sync + +import ( + "context" + "encoding/hex" + "errors" + + "github.com/FleekHQ/space-daemon/core/textile/utils" +) + +func checkTaskType(t *Task, tp taskType) error { + if tp != t.Type { + return errors.New("expected different task type at Textile synchronizer") + } + + return nil +} + +func (s *synchronizer) processAddItem(ctx context.Context, task *Task) error { + if err := checkTaskType(task, addItemTask); err != nil { + return err + } + + bucket := task.Args[0] + path := task.Args[1] + + bucketModel, err := s.model.FindBucket(ctx, bucket) + if err != nil { + return err + } + + mirrorFile, err := s.model.FindMirrorFileByPathAndBucketSlug(ctx, path, bucket) + + if bucketModel.Backup && mirrorFile == nil { + if err := s.setMirrorFileBackup(ctx, path, bucket); err != nil { + return err + } + } + + return nil +} + +func (s *synchronizer) processRemoveItem(ctx context.Context, task *Task) error { + if err := checkTaskType(task, removeItemTask); err != nil { + return err + } + + // bucket := task.Args[0] + // path := task.Args[1] + + // TODO: Remove file from mirror + return nil +} + +func (s *synchronizer) processPinFile(ctx context.Context, task *Task) error { + if err := checkTaskType(task, pinFileTask); err != nil { + return err + } + + bucket := task.Args[0] + path := task.Args[1] + + err := s.uploadFileToRemote(ctx, bucket, path) + + return err +} + +func (s *synchronizer) processUnpinFile(ctx context.Context, task *Task) error { + if err := checkTaskType(task, unpinFileTask); err != nil { + return err + } + + bucket := task.Args[0] + path := task.Args[1] + + err := s.deleteFileFromRemote(ctx, bucket, path) + + return err +} + +func (s *synchronizer) processCreateBucket(ctx context.Context, task *Task) error { + if err := checkTaskType(task, createBucketTask); err != nil { + return err + } + + bucket := task.Args[0] + enckey, err := hex.DecodeString(task.Args[1]) + if err != nil { + return err + } + + mirror, err := s.createMirrorBucket(ctx, bucket, enckey) + if mirror != nil { + _, err = s.model.CreateMirrorBucket(ctx, bucket, mirror) + } + + return err +} + +func (s *synchronizer) processBucketBackupOn(ctx context.Context, task *Task) error { + if err := checkTaskType(task, bucketBackupOnTask); err != nil { + return err + } + + bucket := task.Args[0] + + bucketModel, err := s.model.FindBucket(ctx, bucket) + if err != nil { + return err + } + + dbID, err := utils.ParseDbIDFromString(bucketModel.DbID) + if err != nil { + return err + } + + if err := s.replicateThreadToHub(ctx, dbID); err != nil { + return err + } + + return s.uploadAllFilesInPath(ctx, bucket, "") +} + +func (s *synchronizer) processBucketBackupOff(ctx context.Context, task *Task) error { + if err := checkTaskType(task, bucketBackupOffTask); err != nil { + return err + } + + bucket := task.Args[0] + + bucketModel, err := s.model.FindBucket(ctx, bucket) + if err != nil { + return err + } + + dbID, err := utils.ParseDbIDFromString(bucketModel.DbID) + if err != nil { + return err + } + + if err := s.dereplicateThreadFromHub(ctx, dbID); err != nil { + return err + } + + return s.deleteAllFilesInPath(ctx, bucket, "") +} diff --git a/core/textile/sync/task.go b/core/textile/sync/task.go new file mode 100644 index 00000000..a3488818 --- /dev/null +++ b/core/textile/sync/task.go @@ -0,0 +1,53 @@ +package sync + +import ( + "strings" +) + +type taskType string + +const ( + addItemTask taskType = "ADD_ITEM" + removeItemTask taskType = "REMOVE_ITEM" + createBucketTask taskType = "CREATE_BUCKET" + pinFileTask taskType = "PIN_FILE" + unpinFileTask taskType = "UNPIN_FILE" + bucketBackupOnTask taskType = "TOGGLE_BACKUP_ON" + bucketBackupOffTask taskType = "TOGGLE_BACKUP_OFF" +) + +type taskState string + +const ( + taskQueued taskState = "QUEUED" + taskPending taskState = "PENDING" + taskSucceeded taskState = "SUCCESS" + taskFailed taskState = "FAILED" + taskDequeued taskState = "DEQUEUED" +) + +type Task struct { + ID string `json:"id"` + State taskState `json:"state"` + Type taskType `json:"type"` + Args []string `json:"args"` + Parallelizable bool `json:"parallelizable"` + + // Set to -1 for infinite retries + MaxRetries int `json:"maxRetries"` + Retries int `json:"retries"` +} + +func newTask(t taskType, args []string) *Task { + id := string(t) + "_" + strings.Join(args, "_") + + return &Task{ + ID: id, + State: taskQueued, + Type: t, + Args: args, + Parallelizable: false, + MaxRetries: -1, + Retries: 0, + } +} diff --git a/core/textile/replica.go b/core/textile/sync/threads.go similarity index 53% rename from core/textile/replica.go rename to core/textile/sync/threads.go index 39043e93..307e1e99 100644 --- a/core/textile/replica.go +++ b/core/textile/sync/threads.go @@ -1,24 +1,23 @@ -package textile +package sync import ( "context" "fmt" "github.com/FleekHQ/space-daemon/config" - "github.com/textileio/go-threads/core/thread" "github.com/textileio/textile/cmd" ) // replicate a local thread on the hub -func (tc *textileClient) ReplicateThreadToHub(ctx context.Context, dbID *thread.ID) error { +func (s *synchronizer) replicateThreadToHub(ctx context.Context, dbID *thread.ID) error { - hubma := tc.cfg.GetString(config.TextileHubMa, "") + hubma := s.cfg.GetString(config.TextileHubMa, "") if hubma == "" { return fmt.Errorf("no textile hub set") } - _, err := tc.netc.AddReplicator(ctx, *dbID, cmd.AddrFromStr(hubma)) + _, err := s.netc.AddReplicator(ctx, *dbID, cmd.AddrFromStr(hubma)) if err != nil { return err } @@ -27,7 +26,7 @@ func (tc *textileClient) ReplicateThreadToHub(ctx context.Context, dbID *thread. } // dereplicate a local thread from the hub -func (tc *textileClient) DereplicateThreadFromHub(ctx context.Context, dbID *thread.ID) error { +func (s *synchronizer) dereplicateThreadFromHub(ctx context.Context, dbID *thread.ID) error { // TODO diff --git a/core/textile/textile.go b/core/textile/textile.go index 1ee0cdff..15e288da 100644 --- a/core/textile/textile.go +++ b/core/textile/textile.go @@ -10,9 +10,7 @@ import ( "github.com/FleekHQ/space-daemon/core/space/domain" "github.com/FleekHQ/space-daemon/core/textile/bucket" "github.com/FleekHQ/space-daemon/core/textile/model" - "github.com/ipfs/interface-go-ipfs-core/path" "github.com/libp2p/go-libp2p-core/crypto" - "github.com/textileio/go-threads/core/thread" "github.com/textileio/go-threads/db" buckets_pb "github.com/textileio/textile/api/buckets/pb" @@ -32,42 +30,7 @@ const ( type BucketRoot buckets_pb.Root type Bucket interface { - Slug() string - Key() string - GetData() bucket.BucketData - GetThreadID(ctx context.Context) (*thread.ID, error) - DirExists(ctx context.Context, path string) (bool, error) - FileExists(ctx context.Context, path string) (bool, error) - UploadFile( - ctx context.Context, - path string, - reader io.Reader, - ) (result path.Resolved, root path.Path, err error) - GetFile( - ctx context.Context, - path string, - w io.Writer, - ) error - CreateDirectory( - ctx context.Context, - path string, - ) (result path.Resolved, root path.Path, err error) - ListDirectory( - ctx context.Context, - path string, - ) (*bucket.DirEntries, error) - DeleteDirOrFile( - ctx context.Context, - path string, - ) (path.Resolved, error) -} - -type backuper interface { - BackupBucket(ctx context.Context, bucket Bucket) (int, error) - BackupFileWithReader(ctx context.Context, bucket Bucket, path string, reader io.Reader) error - UnbackupBucket(ctx context.Context, bucket Bucket) (int, error) - IsBucketBackup(ctx context.Context, bucketSlug string) bool - IsMirrorFile(ctx context.Context, path, bucketSlug string) bool + bucket.BucketInterface } type Client interface { @@ -83,8 +46,6 @@ type Client interface { JoinBucket(ctx context.Context, slug string, ti *domain.ThreadInfo) (bool, error) CreateBucket(ctx context.Context, bucketSlug string) (Bucket, error) ToggleBucketBackup(ctx context.Context, bucketSlug string, bucketBackup bool) (bool, error) - ReplicateThreadToHub(ctx context.Context, dbID *thread.ID) error - DereplicateThreadFromHub(ctx context.Context, dbID *thread.ID) error SendMessage(ctx context.Context, recipient crypto.PubKey, body []byte) (*client.Message, error) Shutdown() error WaitForReady() chan bool @@ -97,13 +58,11 @@ type Client interface { RejectSharedFilesInvitation(ctx context.Context, invitation domain.Invitation) (domain.Invitation, error) RemoveKeys() error AttachMailboxNotifier(notif GrpcMailboxNotifier) - UploadFileToHub(ctx context.Context, b Bucket, path string, reader io.Reader) (result path.Resolved, root path.Path, err error) GetReceivedFiles(ctx context.Context, accepted bool, seek string, limit int) ([]*domain.SharedDirEntry, string, error) GetPathAccessRoles(ctx context.Context, b Bucket, path string) ([]domain.Member, error) GetPublicShareBucket(ctx context.Context) (Bucket, error) DownloadPublicGatewayItem(ctx context.Context, cid cid.Cid) (io.ReadCloser, error) GetFailedHealthchecks() int - backuper } type Buckd interface { diff --git a/go.mod b/go.mod index e629ceb9..2078fd65 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/multiformats/go-multibase v0.0.3 github.com/multiformats/go-multihash v0.0.14 github.com/odeke-em/go-utils v0.0.0-20170224015737-e8ebaed0777a + github.com/odeke-em/go-uuid v0.0.0-20151221120446-b211d769a9aa github.com/pkg/errors v0.9.1 github.com/radovskyb/watcher v1.0.7 github.com/rakyll/statik v0.1.7 diff --git a/mocks/Bucket.go b/mocks/Bucket.go index a64f7d57..8a01acea 100644 --- a/mocks/Bucket.go +++ b/mocks/Bucket.go @@ -118,6 +118,54 @@ func (_m *Bucket) FileExists(ctx context.Context, _a1 string) (bool, error) { return r0, r1 } +// GetClient provides a mock function with given fields: +func (_m *Bucket) GetClient() bucket.BucketsClient { + ret := _m.Called() + + var r0 bucket.BucketsClient + if rf, ok := ret.Get(0).(func() bucket.BucketsClient); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(bucket.BucketsClient) + } + } + + return r0 +} + +// GetContext provides a mock function with given fields: ctx +func (_m *Bucket) GetContext(ctx context.Context) (context.Context, *thread.ID, error) { + ret := _m.Called(ctx) + + var r0 context.Context + if rf, ok := ret.Get(0).(func(context.Context) context.Context); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + var r1 *thread.ID + if rf, ok := ret.Get(1).(func(context.Context) *thread.ID); ok { + r1 = rf(ctx) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*thread.ID) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context) error); ok { + r2 = rf(ctx) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // GetData provides a mock function with given fields: func (_m *Bucket) GetData() bucket.BucketData { ret := _m.Called() diff --git a/mocks/Client.go b/mocks/Client.go index ef664926..af2d45c2 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.2.1. DO NOT EDIT. +// Code generated by mockery v2.0.0. DO NOT EDIT. package mocks @@ -21,12 +21,8 @@ import ( model "github.com/FleekHQ/space-daemon/core/textile/model" - path "github.com/ipfs/interface-go-ipfs-core/path" - textile "github.com/FleekHQ/space-daemon/core/textile" - thread "github.com/textileio/go-threads/core/thread" - usersclient "github.com/textileio/textile/api/users/client" ) @@ -61,41 +57,6 @@ func (_m *Client) AttachMailboxNotifier(notif textile.GrpcMailboxNotifier) { _m.Called(notif) } -// BackupBucket provides a mock function with given fields: ctx, bucket -func (_m *Client) BackupBucket(ctx context.Context, bucket textile.Bucket) (int, error) { - ret := _m.Called(ctx, bucket) - - var r0 int - if rf, ok := ret.Get(0).(func(context.Context, textile.Bucket) int); ok { - r0 = rf(ctx, bucket) - } else { - r0 = ret.Get(0).(int) - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, textile.Bucket) error); ok { - r1 = rf(ctx, bucket) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// BackupFileWithReader provides a mock function with given fields: ctx, bucket, _a2, reader -func (_m *Client) BackupFileWithReader(ctx context.Context, bucket textile.Bucket, _a2 string, reader io.Reader) error { - ret := _m.Called(ctx, bucket, _a2, reader) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, textile.Bucket, string, io.Reader) error); ok { - r0 = rf(ctx, bucket, _a2, reader) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // CreateBucket provides a mock function with given fields: ctx, bucketSlug func (_m *Client) CreateBucket(ctx context.Context, bucketSlug string) (textile.Bucket, error) { ret := _m.Called(ctx, bucketSlug) @@ -119,20 +80,6 @@ func (_m *Client) CreateBucket(ctx context.Context, bucketSlug string) (textile. return r0, r1 } -// DereplicateThreadFromHub provides a mock function with given fields: ctx, dbID -func (_m *Client) DereplicateThreadFromHub(ctx context.Context, dbID *thread.ID) error { - ret := _m.Called(ctx, dbID) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *thread.ID) error); ok { - r0 = rf(ctx, dbID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // DownloadPublicGatewayItem provides a mock function with given fields: ctx, _a1 func (_m *Client) DownloadPublicGatewayItem(ctx context.Context, _a1 cid.Cid) (io.ReadCloser, error) { ret := _m.Called(ctx, _a1) @@ -255,13 +202,13 @@ func (_m *Client) GetModel() model.Model { return r0 } -// GetPathAccessRoles provides a mock function with given fields: ctx, b, _a2 -func (_m *Client) GetPathAccessRoles(ctx context.Context, b textile.Bucket, _a2 string) ([]domain.Member, error) { - ret := _m.Called(ctx, b, _a2) +// GetPathAccessRoles provides a mock function with given fields: ctx, b, path +func (_m *Client) GetPathAccessRoles(ctx context.Context, b textile.Bucket, path string) ([]domain.Member, error) { + ret := _m.Called(ctx, b, path) var r0 []domain.Member if rf, ok := ret.Get(0).(func(context.Context, textile.Bucket, string) []domain.Member); ok { - r0 = rf(ctx, b, _a2) + r0 = rf(ctx, b, path) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]domain.Member) @@ -270,7 +217,7 @@ func (_m *Client) GetPathAccessRoles(ctx context.Context, b textile.Bucket, _a2 var r1 error if rf, ok := ret.Get(1).(func(context.Context, textile.Bucket, string) error); ok { - r1 = rf(ctx, b, _a2) + r1 = rf(ctx, b, path) } else { r1 = ret.Error(1) } @@ -354,20 +301,6 @@ func (_m *Client) GetThreadsConnection() (*client.Client, error) { return r0, r1 } -// IsBucketBackup provides a mock function with given fields: ctx, bucketSlug -func (_m *Client) IsBucketBackup(ctx context.Context, bucketSlug string) bool { - ret := _m.Called(ctx, bucketSlug) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { - r0 = rf(ctx, bucketSlug) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - // IsHealthy provides a mock function with given fields: func (_m *Client) IsHealthy() bool { ret := _m.Called() @@ -396,20 +329,6 @@ func (_m *Client) IsInitialized() bool { return r0 } -// IsMirrorFile provides a mock function with given fields: ctx, _a1, bucketSlug -func (_m *Client) IsMirrorFile(ctx context.Context, _a1 string, bucketSlug string) bool { - ret := _m.Called(ctx, _a1, bucketSlug) - - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context, string, string) bool); ok { - r0 = rf(ctx, _a1, bucketSlug) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - // IsRunning provides a mock function with given fields: func (_m *Client) IsRunning() bool { ret := _m.Called() @@ -503,20 +422,6 @@ func (_m *Client) RemoveKeys() error { return r0 } -// ReplicateThreadToHub provides a mock function with given fields: ctx, dbID -func (_m *Client) ReplicateThreadToHub(ctx context.Context, dbID *thread.ID) error { - ret := _m.Called(ctx, dbID) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *thread.ID) error); ok { - r0 = rf(ctx, dbID) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // SendMessage provides a mock function with given fields: ctx, recipient, body func (_m *Client) SendMessage(ctx context.Context, recipient crypto.PubKey, body []byte) (*usersclient.Message, error) { ret := _m.Called(ctx, recipient, body) @@ -540,29 +445,6 @@ func (_m *Client) SendMessage(ctx context.Context, recipient crypto.PubKey, body return r0, r1 } -// SetMirrorFileBackup provides a mock function with given fields: ctx, _a1, bucketSlug -func (_m *Client) SetMirrorFileBackup(ctx context.Context, _a1 string, bucketSlug string) (*domain.MirrorFile, error) { - ret := _m.Called(ctx, _a1, bucketSlug) - - var r0 *domain.MirrorFile - if rf, ok := ret.Get(0).(func(context.Context, string, string) *domain.MirrorFile); ok { - r0 = rf(ctx, _a1, bucketSlug) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*domain.MirrorFile) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, _a1, bucketSlug) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // ShareBucket provides a mock function with given fields: ctx, bucketSlug func (_m *Client) ShareBucket(ctx context.Context, bucketSlug string) (*db.Info, error) { ret := _m.Called(ctx, bucketSlug) @@ -649,73 +531,6 @@ func (_m *Client) ToggleBucketBackup(ctx context.Context, bucketSlug string, buc return r0, r1 } -// UnbackupBucket provides a mock function with given fields: ctx, bucket -func (_m *Client) UnbackupBucket(ctx context.Context, bucket textile.Bucket) (int, error) { - ret := _m.Called(ctx, bucket) - - var r0 int - if rf, ok := ret.Get(0).(func(context.Context, textile.Bucket) int); ok { - r0 = rf(ctx, bucket) - } else { - r0 = ret.Get(0).(int) - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, textile.Bucket) error); ok { - r1 = rf(ctx, bucket) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// UnsetMirrorFileBackup provides a mock function with given fields: ctx, _a1, bucketSlug -func (_m *Client) UnsetMirrorFileBackup(ctx context.Context, _a1 string, bucketSlug string) error { - ret := _m.Called(ctx, _a1, bucketSlug) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, _a1, bucketSlug) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// UploadFileToHub provides a mock function with given fields: ctx, b, _a2, reader -func (_m *Client) UploadFileToHub(ctx context.Context, b textile.Bucket, _a2 string, reader io.Reader) (path.Resolved, path.Path, error) { - ret := _m.Called(ctx, b, _a2, reader) - - var r0 path.Resolved - if rf, ok := ret.Get(0).(func(context.Context, textile.Bucket, string, io.Reader) path.Resolved); ok { - r0 = rf(ctx, b, _a2, reader) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(path.Resolved) - } - } - - var r1 path.Path - if rf, ok := ret.Get(1).(func(context.Context, textile.Bucket, string, io.Reader) path.Path); ok { - r1 = rf(ctx, b, _a2, reader) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(path.Path) - } - } - - var r2 error - if rf, ok := ret.Get(2).(func(context.Context, textile.Bucket, string, io.Reader) error); ok { - r2 = rf(ctx, b, _a2, reader) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - // WaitForHealthy provides a mock function with given fields: func (_m *Client) WaitForHealthy() chan error { ret := _m.Called()