Skip to content

Commit

Permalink
Make local file cache depend on CID to enable remote updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dmerrill6 committed Nov 3, 2020
1 parent 016aa8e commit 63977d4
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 34 deletions.
1 change: 1 addition & 0 deletions core/space/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type AddWatchFile struct {
BucketKey string `json:"bucket_key"`
BucketSlug string `json:"bucket_slug"`
IsRemote bool `json:"isRemote"`
Cid string `json:"cid"`
}

type Identity struct {
Expand Down
2 changes: 1 addition & 1 deletion core/space/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Space struct {

type Syncer interface {
AddFileWatch(addFileInfo domain.AddWatchFile) error
GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool)
GetOpenFilePath(bucketSlug, bucketPath, dbID, cid string) (string, bool)
}

type AddFileWatchFunc = func(addFileInfo domain.AddWatchFile) error
Expand Down
21 changes: 10 additions & 11 deletions core/space/services/services_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,13 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do
if err != nil {
return domain.OpenFileInfo{}, err
}
if filePath, exists := s.sync.GetOpenFilePath(b.Slug(), path, dbID); exists {
listdir, err := b.ListDirectory(ctx, path)
cid := listdir.Item.Cid
if err != nil {
return domain.OpenFileInfo{}, err
}

if filePath, exists := s.sync.GetOpenFilePath(b.Slug(), path, dbID, cid); exists {
// sanity check in case file was deleted or moved
if PathExists(filePath) {
// return file handle
Expand All @@ -323,7 +329,7 @@ func (s *Space) OpenFile(ctx context.Context, path, bucketName, dbID string) (do
}

// else, open new file on FS
filePath, err = s.openFileOnFs(ctx, path, b, isRemote)
filePath, err = s.openFileOnFs(ctx, path, b, isRemote, dbID, cid)
if err != nil {
return domain.OpenFileInfo{}, err
}
Expand All @@ -349,7 +355,7 @@ func (s *Space) TruncateData(ctx context.Context) error {
return nil
}

func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket, isRemote bool) (string, error) {
func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket, isRemote bool, dbID, cid string) (string, error) {
// write file copy to temp folder
tmpFile, err := s.createTempFileForPath(ctx, path, false)
if err != nil {
Expand All @@ -364,14 +370,6 @@ func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket,
return "", err
}

threadID, err := b.GetThreadID(ctx)
if err != nil {
log.Error(fmt.Sprintf("error getting thread id for bucket %s", b.Key()), err)
return "", err
}

dbID := utils.CastDbIDToString(*threadID)

// register temp file in watcher
addWatchFile := domain.AddWatchFile{
DbId: dbID,
Expand All @@ -380,6 +378,7 @@ func (s *Space) openFileOnFs(ctx context.Context, path string, b textile.Bucket,
BucketKey: b.Key(),
BucketSlug: b.Slug(),
IsRemote: isRemote,
Cid: cid,
}

err = s.sync.AddFileWatch(addWatchFile)
Expand Down
10 changes: 9 additions & 1 deletion core/space/space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func TestService_OpenFile(t *testing.T) {
getDir().dir,
)

mockSync.On("GetOpenFilePath", testKey, testPath, mock.Anything).Return(
mockSync.On("GetOpenFilePath", testKey, testPath, mock.Anything, mock.Anything).Return(
"",
false,
)
Expand All @@ -366,6 +366,14 @@ func TestService_OpenFile(t *testing.T) {
"Slug",
).Return(testKey)

mockBucket.On(
"ListDirectory", mock.Anything, mock.Anything,
).Return(&bucket.DirEntries{
Item: &buckets_pb.PathItem{
Cid: "",
},
}, nil)

testThreadID, err := utils.ParseDbIDFromString("AFKRGLCIX5CQWA2244J3GBH4ERF2MLNPJWVU72BPU2BGB5OOZH5PR7Q=")
if err != nil {
t.Fatal(err)
Expand Down
14 changes: 7 additions & 7 deletions core/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type BucketSynchronizer interface {
Shutdown() error
RegisterNotifier(notifier GrpcNotifier)
AddFileWatch(addFileInfo domain.AddWatchFile) error
GetOpenFilePath(bucketSlug string, bucketPath string, dbID string) (string, bool)
GetOpenFilePath(bucketSlug, bucketPath, dbID, cid string) (string, bool)
}

type TextileNotifier interface {
Expand Down Expand Up @@ -181,10 +181,10 @@ func (bs *bucketSynchronizer) AddFileWatch(addFileInfo domain.AddWatchFile) erro
return nil
}

func (bs *bucketSynchronizer) GetOpenFilePath(bucketSlug, bucketPath, dbID string) (string, bool) {
func (bs *bucketSynchronizer) GetOpenFilePath(bucketSlug, bucketPath, dbID, cid string) (string, bool) {
var fi domain.AddWatchFile
var err error
reversKey := getOpenFileReverseKey(bucketSlug, bucketPath, dbID)
reversKey := getOpenFileReverseKey(bucketSlug, bucketPath, dbID, cid)

if fi, err = bs.getOpenFileInfo(reversKey); err != nil {
return "", false
Expand All @@ -201,8 +201,8 @@ func getOpenFileKey(localPath string) string {
return OpenFilesKeyPrefix + localPath
}

func getOpenFileReverseKey(bucketSlug, bucketPath, dbID string) string {
return ReverseOpenFilesKeyPrefix + bucketSlug + ":" + bucketPath + ":" + dbID
func getOpenFileReverseKey(bucketSlug, bucketPath, dbID, cid string) string {
return ReverseOpenFilesKeyPrefix + bucketSlug + ":" + bucketPath + ":" + dbID + ":" + cid
}

func (bs *bucketSynchronizer) getOpenFileBucketSlugAndPath(localPath string) (domain.AddWatchFile, bool) {
Expand All @@ -228,7 +228,7 @@ func (bs *bucketSynchronizer) addFileInfoToStore(addFileInfo domain.AddWatchFile
if err := bs.store.SetString(getOpenFileKey(addFileInfo.LocalPath), string(out)); err != nil {
return err
}
reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId)
reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId, addFileInfo.Cid)
if err := bs.store.SetString(reverseKey, string(out)); err != nil {
return err
}
Expand All @@ -240,7 +240,7 @@ func (bs *bucketSynchronizer) removeFileInfo(addFileInfo domain.AddWatchFile) er
if err := bs.store.Remove([]byte(getOpenFileKey(addFileInfo.LocalPath))); err != nil {
return err
}
reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId)
reverseKey := getOpenFileReverseKey(addFileInfo.BucketSlug, addFileInfo.BucketPath, addFileInfo.DbId, addFileInfo.Cid)
if err := bs.store.Remove([]byte(reverseKey)); err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion core/textile/event_handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package textile

import (
"github.com/FleekHQ/space-daemon/core/store"
"github.com/FleekHQ/space-daemon/core/textile/bucket"
"github.com/FleekHQ/space-daemon/core/textile/sync"
"github.com/FleekHQ/space-daemon/log"
iface "github.com/ipfs/interface-go-ipfs-core"
tc "github.com/textileio/go-threads/api/client"
)

Expand Down Expand Up @@ -31,11 +33,15 @@ func (h *defaultListenerHandler) OnSave(bucketData *bucket.BucketData, listenEve

type restorerListenerHandler struct {
synchronizer sync.Synchronizer
st store.Store
ipfsClient iface.CoreAPI
}

func newRestorerListenerHandler(synchronizer sync.Synchronizer) *restorerListenerHandler {
func newRestorerListenerHandler(synchronizer sync.Synchronizer, st store.Store, ipfsClient iface.CoreAPI) *restorerListenerHandler {
return &restorerListenerHandler{
synchronizer: synchronizer,
st: st,
ipfsClient: ipfsClient,
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/textile/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (tc *textileClient) addListener(ctx context.Context, bucketSlug string) err
if err := tc.requiresHubConnection(); err != nil {
return err
}
handler := newRestorerListenerHandler(tc.sync)
handler := newRestorerListenerHandler(tc.sync, tc.store, tc.ipfsClient)
handlers := []EventHandler{handler}
listener := NewListener(tc, bucketSlug, handlers)
tc.dbListeners[bucketSlug] = listener
Expand Down
23 changes: 18 additions & 5 deletions core/textile/secure_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,15 @@ func (s *SecureBucketClient) racePullFile(ctx context.Context, key, encPath stri
return
}

bucketPath, err := s.client.ListPath(ctx, key, encPath)
if err != nil {
cacheErrc <- err
return
}
encCid := bucketPath.Item.Cid

cidBinary := p.Cid().Bytes()
err = s.st.Set(getFileCacheKey(encPath), cidBinary)
err = s.st.Set(getFileCacheKey(encCid), cidBinary)

cacheErrc <- err
}()
Expand All @@ -409,10 +416,10 @@ func (s *SecureBucketClient) racePullFile(ctx context.Context, key, encPath stri
return nil
}

const fileCachePrefix = "file_cache"
const FileCachePrefix = "file_cache"

func getFileCacheKey(encPath string) []byte {
return []byte(fileCachePrefix + ":" + encPath)
func getFileCacheKey(encCid string) []byte {
return []byte(FileCachePrefix + ":" + encCid)
}

func (s *SecureBucketClient) pullFileFromClient(ctx context.Context, key, encPath string, w io.Writer, opts ...bc.Option) (shouldCache bool, err error) {
Expand All @@ -433,7 +440,13 @@ var errNoLocalClient = errors.New("No cache client available")
func (s *SecureBucketClient) pullFileFromLocal(ctx context.Context, key, encPath string, w io.Writer, opts ...bc.Option) (shouldCache bool, err error) {
shouldCache = false

cidBinary, err := s.st.Get(getFileCacheKey(encPath))
bucketPath, err := s.client.ListPath(ctx, key, encPath)
if err != nil {
return false, err
}
encCid := bucketPath.Item.Cid

cidBinary, err := s.st.Get(getFileCacheKey(encCid))
if cidBinary == nil || err != nil {
return false, errors.New("CID not stored in local cache")
}
Expand Down
14 changes: 7 additions & 7 deletions mocks/mock_syncer.go → mocks/Syncer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 63977d4

Please sign in to comment.