Skip to content

Commit

Permalink
Remove file info stream and add more info to file events (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmerrill6 authored Nov 5, 2020
1 parent 12defc5 commit df8f9da
Show file tree
Hide file tree
Showing 9 changed files with 1,006 additions and 1,217 deletions.
14 changes: 9 additions & 5 deletions core/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ const (
)

type FileEvent struct {
Info domain.FileInfo
Type FileEventType
Info domain.FileInfo
Type FileEventType
Bucket string
DbID string
}

func NewFileEvent(info domain.FileInfo, eventType FileEventType) FileEvent {
func NewFileEvent(info domain.FileInfo, eventType FileEventType, bucket, dbID string) FileEvent {
return FileEvent{
Info: info,
Type: eventType,
Info: info,
Type: eventType,
Bucket: bucket,
DbID: dbID,
}
}

Expand Down
7 changes: 6 additions & 1 deletion core/textile/sync/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,18 @@ func (s *synchronizer) restoreBucket(ctx context.Context, bucketSlug string) err
}
}

bucketModel, err := s.model.FindBucket(ctx, bucketSlug)
if err != nil {
return err
}

item, err := mirrorBucket.ListDirectory(ctx, itemPath)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), itemPath)
info.BackedUp = true
info.LocallyAvailable = exists
info.RestoreInProgress = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileRestoring))
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileRestoring, bucketSlug, bucketModel.DbID))
}

s.NotifyFileRestore(bucketSlug, itemPath)
Expand Down
19 changes: 13 additions & 6 deletions core/textile/sync/task-executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/hex"
"errors"

"fmt"
"path"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -58,7 +57,7 @@ func (s *synchronizer) processAddItem(ctx context.Context, task *Task) error {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), path)
info.LocallyAvailable = true
info.BackupInProgress = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileBackupInProgress))
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileBackupInProgress, bucket, bucketModel.DbID))
}

pft := newTask(pinFileTask, []string{bucket, path})
Expand Down Expand Up @@ -117,12 +116,17 @@ func (s *synchronizer) processPinFile(ctx context.Context, task *Task) error {
return err
}

bucketModel, err := s.model.FindBucket(ctx, bucket)
if err != nil {
return err
}

item, err := localBucket.ListDirectory(ctx, path)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), path)
info.LocallyAvailable = true
info.BackedUp = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileBackupReady))
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileBackupReady, bucket, bucketModel.DbID))
}

return nil
Expand Down Expand Up @@ -240,8 +244,6 @@ func (s *synchronizer) processBucketRestoreTask(ctx context.Context, task *Task)
}

func (s *synchronizer) processRestoreFile(ctx context.Context, task *Task) error {
log.Debug(fmt.Sprintf("processRestoreFile: 1"))

if err := checkTaskType(task, restoreFileTask); err != nil {
return err
}
Expand Down Expand Up @@ -275,12 +277,17 @@ func (s *synchronizer) processRestoreFile(ctx context.Context, task *Task) error
return err
}

bucketModel, err := s.model.FindBucket(ctx, bucket)
if err != nil {
return err
}

item, err := mirrorBucket.ListDirectory(ctx, path)
if s.eventNotifier != nil && err == nil {
info := utils.MapDirEntryToFileInfo(api_buckets_pb.ListPathResponse(*item), path)
info.LocallyAvailable = true
info.BackedUp = true
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileRestored))
s.eventNotifier.SendFileEvent(events.NewFileEvent(info, events.FileRestored, bucket, bucketModel.DbID))
}

return err
Expand Down
1 change: 0 additions & 1 deletion grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type grpcServer struct {
fc *fuse.Controller
// TODO: see if we need to clean this up by gc or handle an array
fileEventStream pb.SpaceApi_SubscribeServer
fileInfoStream pb.SpaceApi_FileInfoSubscribeServer
txlEventStream pb.SpaceApi_TxlSubscribeServer
notificationEventStream pb.SpaceApi_NotificationSubscribeServer
isStarted bool
Expand Down
23 changes: 4 additions & 19 deletions grpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func (srv *grpcServer) SendFileEvent(event events.FileEvent) {
entry := dirEntries[0]

pe := &pb.FileEventResponse{
Type: mapFileEventToPb(event.Type),
Entry: entry,
Type: mapFileEventToPb(event.Type),
Entry: entry,
Bucket: event.Bucket,
DbId: event.DbID,
}

srv.sendFileEvent(pe)
Expand Down Expand Up @@ -178,23 +180,6 @@ func (srv *grpcServer) TxlSubscribe(empty *empty.Empty, stream pb.SpaceApi_TxlSu
return nil
}

func (srv *grpcServer) FileInfoSubscribe(empty *empty.Empty, stream pb.SpaceApi_FileInfoSubscribeServer) error {
srv.registerFileInfoStream(stream)
// waits until request is done
select {
case <-stream.Context().Done():
break
}
// clean up stream
srv.registerFileInfoStream(nil)
log.Info("closing stream")
return nil
}

func (srv *grpcServer) registerFileInfoStream(stream pb.SpaceApi_FileInfoSubscribeServer) {
srv.fileInfoStream = stream
}

func (srv *grpcServer) registerTxlStream(stream pb.SpaceApi_TxlSubscribeServer) {
srv.txlEventStream = stream
}
Expand Down
Loading

0 comments on commit df8f9da

Please sign in to comment.