Skip to content

Commit

Permalink
Fix file subscription events (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmerrill6 authored Nov 4, 2020
1 parent d811f12 commit 12defc5
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 340 deletions.
18 changes: 6 additions & 12 deletions core/events/events.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package events

import (
"os"
)
import "github.com/FleekHQ/space-daemon/core/space/domain"

// These file defines events that daemon can propagate through all layers

Expand All @@ -25,18 +23,14 @@ const (
)

type FileEvent struct {
Path string
Bucket string
Info os.FileInfo
Type FileEventType
Info domain.FileInfo
Type FileEventType
}

func NewFileEvent(path, bucket string, eventType FileEventType, info os.FileInfo) FileEvent {
func NewFileEvent(info domain.FileInfo, eventType FileEventType) FileEvent {
return FileEvent{
Path: path,
Bucket: bucket,
Type: eventType,
Info: info,
Info: info,
Type: eventType,
}
}

Expand Down
11 changes: 9 additions & 2 deletions core/textile/sync/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/FleekHQ/space-daemon/core/events"
"github.com/FleekHQ/space-daemon/core/textile/bucket"
"github.com/FleekHQ/space-daemon/core/textile/utils"
"github.com/FleekHQ/space-daemon/log"
api_buckets_pb "github.com/textileio/textile/v2/api/buckets/pb"
)

// return the targetBucket if path is newer there, srcBucket otherwise
Expand Down Expand Up @@ -58,8 +60,13 @@ func (s *synchronizer) restoreBucket(ctx context.Context, bucketSlug string) err
}
}

if s.eventNotifier != nil {
s.eventNotifier.SendFileEvent(events.NewFileEvent(itemPath, bucketSlug, events.FileRestoring, nil))
item, err := mirrorBucket.ListDirectory(ctx, itemPath)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), itemPath)
info.BackedUp = true
info.LocallyAvailable = exists
info.RestoreInProgress = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileRestoring))
}

s.NotifyFileRestore(bucketSlug, itemPath)
Expand Down
35 changes: 29 additions & 6 deletions core/textile/sync/task-executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/FleekHQ/space-daemon/core/textile/model"
api_buckets_pb "github.com/textileio/textile/v2/api/buckets/pb"

"github.com/FleekHQ/space-daemon/log"

Expand Down Expand Up @@ -47,8 +48,17 @@ func (s *synchronizer) processAddItem(ctx context.Context, task *Task) error {
}
}

if s.eventNotifier != nil {
s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupInProgress, nil))
localBucket, err := s.getBucket(ctx, bucket)
if err != nil {
return err
}

item, err := localBucket.ListDirectory(ctx, path)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), path)
info.LocallyAvailable = true
info.BackupInProgress = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileBackupInProgress))
}

pft := newTask(pinFileTask, []string{bucket, path})
Expand Down Expand Up @@ -102,8 +112,17 @@ func (s *synchronizer) processPinFile(ctx context.Context, task *Task) error {

s.setMirrorFileBackup(ctx, path, bucket, false)

if s.eventNotifier != nil {
s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileBackupReady, nil))
localBucket, err := s.getBucket(ctx, bucket)
if err != nil {
return err
}

item, err := localBucket.ListDirectory(ctx, path)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), path)
info.LocallyAvailable = true
info.BackedUp = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileBackupReady))
}

return nil
Expand Down Expand Up @@ -256,8 +275,12 @@ func (s *synchronizer) processRestoreFile(ctx context.Context, task *Task) error
return err
}

if s.eventNotifier != nil {
s.eventNotifier.SendFileEvent(events.NewFileEvent(path, bucket, events.FileRestored, nil))
item, err := mirrorBucket.ListDirectory(ctx, path)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), path)
info.LocallyAvailable = true
info.BackedUp = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileRestored))
}

return err
Expand Down
29 changes: 29 additions & 0 deletions core/textile/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import (
"encoding/hex"
"errors"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/FleekHQ/space-daemon/config"
"github.com/FleekHQ/space-daemon/core/keychain"
"github.com/FleekHQ/space-daemon/core/space/domain"
"github.com/FleekHQ/space-daemon/core/store"
"github.com/FleekHQ/space-daemon/core/textile/hub"
"github.com/FleekHQ/space-daemon/log"
Expand All @@ -20,6 +24,7 @@ import (
"github.com/textileio/go-threads/core/thread"
"github.com/textileio/go-threads/db"
nc "github.com/textileio/go-threads/net/api/client"
bucketsproto "github.com/textileio/textile/v2/api/buckets/pb"
"github.com/textileio/textile/v2/api/common"
"github.com/textileio/textile/v2/cmd"
"golang.org/x/crypto/pbkdf2"
Expand Down Expand Up @@ -286,3 +291,27 @@ func successfulThreadCreation(st store.Store, dbID *thread.ID, dbIDInBytes, stor

return dbID, nil
}

func MapDirEntryToFileInfo(entry bucketsproto.ListPathResponse, itemPath string) domain.FileInfo {
item := entry.Item
info := domain.FileInfo{
DirEntry: domain.DirEntry{
Path: itemPath,
IsDir: item.IsDir,
Name: item.Name,
SizeInBytes: strconv.FormatInt(item.Size, 10),
FileExtension: strings.Replace(filepath.Ext(item.Name), ".", "", -1),
// FIXME: real created at needed
Created: time.Unix(0, item.Metadata.UpdatedAt).Format(time.RFC3339),
Updated: time.Unix(0, item.Metadata.UpdatedAt).Format(time.RFC3339),
Members: []domain.Member{},
},
IpfsHash: item.Cid,
BackedUp: false,
LocallyAvailable: false,
BackupInProgress: false,
RestoreInProgress: false,
}

return info
}
35 changes: 34 additions & 1 deletion grpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,44 @@ func (srv *grpcServer) sendFileEvent(event *pb.FileEventResponse) {
}

func (srv *grpcServer) SendFileEvent(event events.FileEvent) {
pe := &pb.FileEventResponse{}
dirEntries := mapFileInfoToDirectoryEntry([]domain.FileInfo{event.Info})
entry := dirEntries[0]

pe := &pb.FileEventResponse{
Type: mapFileEventToPb(event.Type),
Entry: entry,
}

srv.sendFileEvent(pe)
}

func mapFileEventToPb(eventType events.FileEventType) pb.EventType {
switch eventType {
case events.FileAdded:
return pb.EventType_ENTRY_ADDED
case events.FileDeleted:
return pb.EventType_ENTRY_DELETED
case events.FileUpdated:
return pb.EventType_ENTRY_UPDATED
case events.FileBackupInProgress:
return pb.EventType_ENTRY_BACKUP_IN_PROGRESS
case events.FileBackupReady:
return pb.EventType_ENTRY_BACKUP_READY
case events.FileRestoring:
return pb.EventType_ENTRY_RESTORE_IN_PROGRESS
case events.FileRestored:
return pb.EventType_ENTRY_RESTORE_READY
case events.FolderAdded:
return pb.EventType_FOLDER_ADDED
case events.FolderDeleted:
return pb.EventType_FOLDER_DELETED
case events.FolderUpdated:
return pb.EventType_FOLDER_UPDATED
default:
return pb.EventType_ENTRY_ADDED
}
}

func (srv *grpcServer) sendTextileEvent(event *pb.TextileEventResponse) {
if srv.txlEventStream != nil {
log.Info("sending events to client")
Expand Down
Loading

0 comments on commit 12defc5

Please sign in to comment.