From ca9eb88c796787e0f09349a2614c9a00bc23af0f Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 6 Sep 2024 17:48:57 -0600 Subject: [PATCH 01/20] planning phase --- component/file_cache/file_cache.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 1edca3be2..31ec3c102 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -579,6 +579,9 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { return err } + // move cached files + // update cache policy + go fc.invalidateDirectory(options.Src) // TLDR: Dst is guaranteed to be non-existent or empty. // Note: We do not need to invalidate Dst due to the logic in our FUSE connector, see comments there. From c53e1e687c186d13340a9c9c0809d2261a8810d0 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 9 Sep 2024 18:11:48 -0600 Subject: [PATCH 02/20] Copy changes from async-cloud --- component/file_cache/file_cache.go | 37 ++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 31ec3c102..46d7fed0c 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -580,6 +580,43 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { } // move cached files + localSrcPath := common.JoinUnixFilepath(fc.tmpPath, options.Src) + localDstPath := common.JoinUnixFilepath(fc.tmpPath, options.Dst) + // in case of git clone multiple rename requests come for which destination files already exists in system + // if we do not perform rename operation locally and those destination files are cached then next time they are read + // we will be serving the wrong content (as we did not rename locally, we still be having older destination files with + // stale content). We either need to remove dest file as well from cache or just run rename to replace the content. + err = os.Rename(localSrcPath, localDstPath) + if err != nil && !os.IsNotExist(err) { + log.Err("FileCache::RenameFile : %s failed to rename local file %s [%s]", localSrcPath, err.Error()) + } + + if err != nil { + // If there was a problem in local rename then delete the destination file + // it might happen that dest file was already there and local rename failed + // so deleting local dest file ensures next open of that will get the updated file from container + err = deleteFile(localDstPath) + if err != nil && !os.IsNotExist(err) { + log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localDstPath, err.Error()) + } + + fc.policy.CachePurge(localDstPath) + } + + err = deleteFile(localSrcPath) + if err != nil && !os.IsNotExist(err) { + log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localSrcPath, err.Error()) + } + + fc.policy.CachePurge(localSrcPath) + //TODO: remove cacheTimeout = 0 functionality. Doesn't work well with cloudfuses' intent + if fc.cacheTimeout == 0 { + // Destination file needs to be deleted immediately + fc.policy.CachePurge(localDstPath) + } else { + // Add destination file to cache, it will be removed on timeout + fc.policy.CacheValid(localDstPath) + } // update cache policy go fc.invalidateDirectory(options.Src) From 038af7e1ac9ba2e191da3cac8777e877c65fd171 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 10 Sep 2024 10:52:53 -0600 Subject: [PATCH 03/20] Use LoadOrStore instead of inviting a race condition --- component/file_cache/lru_policy.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index 7a403aecf..a6c1c7d19 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -222,21 +222,18 @@ func (p *lruPolicy) asyncCacheValid() { } func (p *lruPolicy) cacheValidate(name string) { - var node *lruNode = nil - val, found := p.nodeMap.Load(name) - if !found { - node = &lruNode{ - name: name, - next: nil, - prev: nil, - usage: 0, - deleted: false, - } - p.nodeMap.Store(name, node) - } else { - node = val.(*lruNode) + // create a new node (in case the entry doesn't exist) + node := &lruNode{ + name: name, + next: nil, + prev: nil, + usage: 0, + deleted: false, } + // write new, or get existing entry + val, _ := p.nodeMap.LoadOrStore(name, node) + node = val.(*lruNode) p.Lock() defer p.Unlock() From f684b8f16f6e88d8888e47e0d3ec30ee3233fd55 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 10 Sep 2024 10:54:41 -0600 Subject: [PATCH 04/20] tidy LoadOrStore call --- component/file_cache/lru_policy.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index a6c1c7d19..c41c98aec 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -223,17 +223,15 @@ func (p *lruPolicy) asyncCacheValid() { func (p *lruPolicy) cacheValidate(name string) { - // create a new node (in case the entry doesn't exist) - node := &lruNode{ + // write new, or get existing entry + val, _ := p.nodeMap.LoadOrStore(name, &lruNode{ name: name, next: nil, prev: nil, usage: 0, deleted: false, - } - // write new, or get existing entry - val, _ := p.nodeMap.LoadOrStore(name, node) - node = val.(*lruNode) + }) + node := val.(*lruNode) p.Lock() defer p.Unlock() From ade41dfce8facaf7acd606d2b761ef2584b36a7e Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 10 Sep 2024 10:56:51 -0600 Subject: [PATCH 05/20] Add helpful comment to explain LoadOrStore --- component/file_cache/lru_policy.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index c41c98aec..6beb0a1d4 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -223,7 +223,8 @@ func (p *lruPolicy) asyncCacheValid() { func (p *lruPolicy) cacheValidate(name string) { - // write new, or get existing entry + // get existing entry, or if it doesn't exist then + // write a new one and return it val, _ := p.nodeMap.LoadOrStore(name, &lruNode{ name: name, next: nil, From fc078a0e1117588b3c7c5f5f9ee388082050ac82 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 10 Sep 2024 14:19:53 -0600 Subject: [PATCH 06/20] Use recursion to preserve error on delete retry --- component/file_cache/cache_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index f7098b19c..a646d9cc4 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -109,7 +109,7 @@ func deleteFile(name string) error { return err } - err = os.Remove(name) + return deleteFile(name) } else if err != nil && os.IsNotExist(err) { log.Debug("cachePolicy::deleteFile : %s does not exist in local cache", name) return nil From 8e600e23badae93bb2976b9d9fc505c402634509 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 10 Sep 2024 14:20:02 -0600 Subject: [PATCH 07/20] spelling --- component/file_cache/lru_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index 6beb0a1d4..cbe1c2863 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -440,7 +440,7 @@ func (p *lruPolicy) deleteItem(name string) { return } - // There are no open handles for this file so its safe to remove this + // There are no open handles for this file so it's safe to remove this // Check if the file exists first, since this is often the second time we're calling deleteFile _, err := os.Stat(name) if err != nil && os.IsNotExist(err) { From d5c028933024170ef7071c6dc4d7061b24638cfc Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 10 Sep 2024 14:20:24 -0600 Subject: [PATCH 08/20] Separate local cache RenameFile code into a function for reuse. --- component/file_cache/file_cache.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 46d7fed0c..8f48b7d9e 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1305,7 +1305,13 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { // if we do not perform rename operation locally and those destination files are cached then next time they are read // we will be serving the wrong content (as we did not rename locally, we still be having older destination files with // stale content). We either need to remove dest file as well from cache or just run rename to replace the content. - err = os.Rename(localSrcPath, localDstPath) + fc.renameCachedFile(localSrcPath, localDstPath) + + return nil +} + +func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) { + err := os.Rename(localSrcPath, localDstPath) if err != nil && !os.IsNotExist(err) { log.Err("FileCache::RenameFile : %s failed to rename local file %s [%s]", localSrcPath, err.Error()) } @@ -1336,8 +1342,6 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { // Add destination file to cache, it will be removed on timeout fc.policy.CacheValid(localDstPath) } - - return nil } // TruncateFile: Update the file with its new size. From af136e1e0e23422c06c3323d48638ac160bdd902 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 16:16:18 -0600 Subject: [PATCH 09/20] Simplify invalidateDirectory() --- component/file_cache/file_cache.go | 32 ++++++++++++++---------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 8f48b7d9e..b85cef862 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -410,31 +410,29 @@ func (fc *FileCache) invalidateDirectory(name string) { log.Trace("FileCache::invalidateDirectory : %s", name) localPath := filepath.Join(fc.tmpPath, name) - _, err := os.Stat(localPath) - if os.IsNotExist(err) { - log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name) - return - } else if err != nil { - log.Debug("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error()) - return - } // TODO : wouldn't this cause a race condition? a thread might get the lock before we purge - and the file would be non-existent // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' - // Save the paths in lexical order and delete them in reverse order so folders are deleted after their children - var pathsToPurge []string - err = filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { + err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { if err == nil && d != nil { - pathsToPurge = append(pathsToPurge, path) + log.Debug("FileCache::invalidateDirectory : %s getting removed from cache", path) + if !d.IsDir() { + fc.policy.CachePurge(path) + } else { + // defer directory deletion (after its children are gone) + defer fc.policy.CachePurge(path) + } + } else { + if os.IsNotExist(err) { + log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name) + } else if err != nil { + log.Warn("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error()) + } } return nil }) - for i := len(pathsToPurge) - 1; i >= 0; i-- { - log.Debug("FileCache::invalidateDirectory : %s getting removed from cache", pathsToPurge[i]) - fc.policy.CachePurge(pathsToPurge[i]) - } if err != nil { - log.Debug("FileCache::invalidateDirectory : Failed to iterate directory %s [%s].", localPath, err.Error()) + log.Debug("FileCache::invalidateDirectory : Failed to walk directory %s. Here's why: %v", localPath, err) return } } From c6da4bc6a2e3b926f69ccd8f409e4e33c61a78a4 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 16:17:11 -0600 Subject: [PATCH 10/20] Make fs request wait for local files to be deleted - to prevent race conditions. --- component/file_cache/file_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index b85cef862..1e9bb0295 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -452,7 +452,7 @@ func (fc *FileCache) DeleteDir(options internal.DeleteDirOptions) error { // rest api delete will fail while we still need to cleanup the local cache for the same } - go fc.invalidateDirectory(options.Name) + fc.invalidateDirectory(options.Name) return err } From 144b6b7863b2515725f5bac43668fb37d046d2c5 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 16:17:43 -0600 Subject: [PATCH 11/20] Rewrite RenameDir to move local files --- component/file_cache/file_cache.go | 77 ++++++++++++++---------------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 1e9bb0295..262f5f76d 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -567,7 +567,7 @@ func (fc *FileCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool { return fc.NextComponent().IsDirEmpty(options) } -// RenameDir: Recursively invalidate the source directory and its children +// RenameDir: Recursively move the source directory func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst) @@ -577,49 +577,46 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { return err } - // move cached files + // updated local storage localSrcPath := common.JoinUnixFilepath(fc.tmpPath, options.Src) + // move the files localDstPath := common.JoinUnixFilepath(fc.tmpPath, options.Dst) - // in case of git clone multiple rename requests come for which destination files already exists in system - // if we do not perform rename operation locally and those destination files are cached then next time they are read - // we will be serving the wrong content (as we did not rename locally, we still be having older destination files with - // stale content). We either need to remove dest file as well from cache or just run rename to replace the content. - err = os.Rename(localSrcPath, localDstPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to rename local file %s [%s]", localSrcPath, err.Error()) - } - - if err != nil { - // If there was a problem in local rename then delete the destination file - // it might happen that dest file was already there and local rename failed - // so deleting local dest file ensures next open of that will get the updated file from container - err = deleteFile(localDstPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localDstPath, err.Error()) + // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' + _ = filepath.WalkDir(localSrcPath, func(path string, d fs.DirEntry, err error) error { + if err == nil && d != nil { + newPath := strings.Replace(path, localSrcPath, localDstPath, 1) + if !d.IsDir() { + renameErr := os.Rename(path, newPath) + if renameErr != nil { + // if rename fails, we just delete the source file anyway + log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", path, newPath, renameErr) + } else { + fc.policy.CacheValid(newPath) + } + // delete the source from our cache policy + // this will also delete the source file from local storage (if rename failed) + fc.policy.CachePurge(path) + } else { + // create the new directory + mkdirErr := os.MkdirAll(newPath, fc.defaultPermission) + if mkdirErr != nil { + // log any error but do nothing about it + log.Warn("FileCache::RenameDir : Failed to created directory %s. Here's why: %v", newPath, mkdirErr) + } + // defer deleting the src directory (until after its contents are deleted) + defer fc.policy.CachePurge(path) + } + } else { + // none of the files that were moved actually exist in local storage + if os.IsNotExist(err) { + log.Info("FileCache::RenameDir : %s does not exist in local cache.", options.Src) + } else if err != nil { + log.Warn("FileCache::RenameDir : %s stat err [%v].", options.Src, err) + } } + return nil + }) - fc.policy.CachePurge(localDstPath) - } - - err = deleteFile(localSrcPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localSrcPath, err.Error()) - } - - fc.policy.CachePurge(localSrcPath) - //TODO: remove cacheTimeout = 0 functionality. Doesn't work well with cloudfuses' intent - if fc.cacheTimeout == 0 { - // Destination file needs to be deleted immediately - fc.policy.CachePurge(localDstPath) - } else { - // Add destination file to cache, it will be removed on timeout - fc.policy.CacheValid(localDstPath) - } - // update cache policy - - go fc.invalidateDirectory(options.Src) - // TLDR: Dst is guaranteed to be non-existent or empty. - // Note: We do not need to invalidate Dst due to the logic in our FUSE connector, see comments there. return nil } From 414dea7d3b256c2de33d7dbbddf37ea09c606702 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 16:36:58 -0600 Subject: [PATCH 12/20] Fix defer bug (walkdir function was trapping defer) --- component/file_cache/file_cache.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 262f5f76d..68063c77f 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -412,14 +412,15 @@ func (fc *FileCache) invalidateDirectory(name string) { localPath := filepath.Join(fc.tmpPath, name) // TODO : wouldn't this cause a race condition? a thread might get the lock before we purge - and the file would be non-existent // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' + var directoriesToPurge []string err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { if err == nil && d != nil { log.Debug("FileCache::invalidateDirectory : %s getting removed from cache", path) if !d.IsDir() { fc.policy.CachePurge(path) } else { - // defer directory deletion (after its children are gone) - defer fc.policy.CachePurge(path) + // remember to delete the directory later (after its children) + directoriesToPurge = append(directoriesToPurge, path) } } else { if os.IsNotExist(err) { @@ -431,6 +432,11 @@ func (fc *FileCache) invalidateDirectory(name string) { return nil }) + // clean up leftover source directories in reverse order + for i := len(directoriesToPurge); i >= 0; i-- { + fc.policy.CachePurge(directoriesToPurge[i]) + } + if err != nil { log.Debug("FileCache::invalidateDirectory : Failed to walk directory %s. Here's why: %v", localPath, err) return @@ -582,10 +588,12 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { // move the files localDstPath := common.JoinUnixFilepath(fc.tmpPath, options.Dst) // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' + var directoriesToPurge []string _ = filepath.WalkDir(localSrcPath, func(path string, d fs.DirEntry, err error) error { if err == nil && d != nil { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { + log.Debug("FileCache::RenameDir : Renaming local file %s -> %s...", path, newPath) renameErr := os.Rename(path, newPath) if renameErr != nil { // if rename fails, we just delete the source file anyway @@ -597,14 +605,15 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { // this will also delete the source file from local storage (if rename failed) fc.policy.CachePurge(path) } else { + log.Debug("FileCache::RenameDir : Renaming local directory %s -> %s...", path, newPath) // create the new directory mkdirErr := os.MkdirAll(newPath, fc.defaultPermission) if mkdirErr != nil { // log any error but do nothing about it log.Warn("FileCache::RenameDir : Failed to created directory %s. Here's why: %v", newPath, mkdirErr) } - // defer deleting the src directory (until after its contents are deleted) - defer fc.policy.CachePurge(path) + // remember to delete the src directory later (after its contents are deleted) + directoriesToPurge = append(directoriesToPurge, path) } } else { // none of the files that were moved actually exist in local storage @@ -617,6 +626,11 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { return nil }) + // clean up leftover source directories in reverse order + for i := len(directoriesToPurge); i >= 0; i-- { + fc.policy.CachePurge(directoriesToPurge[i]) + } + return nil } From 3e1c426bfd009b138660b70789de61f32f2ff15d Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 17:20:09 -0600 Subject: [PATCH 13/20] Fix OBO error --- component/file_cache/file_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 68063c77f..e1666c5fe 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -433,7 +433,7 @@ func (fc *FileCache) invalidateDirectory(name string) { }) // clean up leftover source directories in reverse order - for i := len(directoriesToPurge); i >= 0; i-- { + for i := len(directoriesToPurge) - 1; i >= 0; i-- { fc.policy.CachePurge(directoriesToPurge[i]) } @@ -627,7 +627,7 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { }) // clean up leftover source directories in reverse order - for i := len(directoriesToPurge); i >= 0; i-- { + for i := len(directoriesToPurge) - 1; i >= 0; i-- { fc.policy.CachePurge(directoriesToPurge[i]) } From b035d5ff52677cca590a5e6f75294884eaa9a7ba Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 17:37:56 -0600 Subject: [PATCH 14/20] Test new RenameDir functionality --- component/file_cache/file_cache_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 92764e1bb..82febe97f 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -561,11 +561,18 @@ func (suite *fileCacheTestSuite) TestRenameDir() { suite.assert.False(suite.fileCache.policy.IsCached(src)) // Directory should not be cached // wait for asynchronous deletion time.Sleep(1 * time.Second) - // directory should not exist in local filesystem + // src directory should not exist in local filesystem fInfo, err := os.Stat(filepath.Join(suite.cache_path, src)) suite.assert.Nil(fInfo) suite.assert.Error(err) suite.assert.True(os.IsNotExist(err)) + // dst directory should exist and have contents from src + dstEntries, err := os.ReadDir(filepath.Join(suite.cache_path, dst)) + suite.assert.NoError(err) + suite.assert.Len(dstEntries, 5) + for i, entry := range dstEntries { + suite.assert.Equal("file"+strconv.Itoa(i), entry.Name()) + } } func (suite *fileCacheTestSuite) TestCreateFile() { From 7dd499216764700ea5f4bddd9986850090fc2a51 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 18:04:25 -0600 Subject: [PATCH 15/20] Add comments and rewrite renamefile --- component/file_cache/cache_policy.go | 2 +- component/file_cache/file_cache.go | 53 ++++++++++------------------ 2 files changed, 19 insertions(+), 36 deletions(-) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index a646d9cc4..cd2a54bdc 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -108,7 +108,7 @@ func deleteFile(name string) error { log.Err("cachePolicy::deleteFile : %s failed to reset permissions", name) return err } - + // recurse so any further errors can be handled or returned return deleteFile(name) } else if err != nil && os.IsNotExist(err) { log.Debug("cachePolicy::deleteFile : %s does not exist in local cache", name) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index e1666c5fe..7cbe1b58b 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -423,6 +423,8 @@ func (fc *FileCache) invalidateDirectory(name string) { directoriesToPurge = append(directoriesToPurge, path) } } else { + // stat(localPath) failed. err is the one returned by stat + // documentation: https://pkg.go.dev/io/fs#WalkDirFunc if os.IsNotExist(err) { log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name) } else if err != nil { @@ -583,9 +585,8 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { return err } - // updated local storage + // move the files in local storage localSrcPath := common.JoinUnixFilepath(fc.tmpPath, options.Src) - // move the files localDstPath := common.JoinUnixFilepath(fc.tmpPath, options.Dst) // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' var directoriesToPurge []string @@ -594,16 +595,7 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { log.Debug("FileCache::RenameDir : Renaming local file %s -> %s...", path, newPath) - renameErr := os.Rename(path, newPath) - if renameErr != nil { - // if rename fails, we just delete the source file anyway - log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", path, newPath, renameErr) - } else { - fc.policy.CacheValid(newPath) - } - // delete the source from our cache policy - // this will also delete the source file from local storage (if rename failed) - fc.policy.CachePurge(path) + fc.renameCachedFile(path, newPath) } else { log.Debug("FileCache::RenameDir : Renaming local directory %s -> %s...", path, newPath) // create the new directory @@ -616,8 +608,10 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { directoriesToPurge = append(directoriesToPurge, path) } } else { - // none of the files that were moved actually exist in local storage + // stat(localPath) failed. err is the one returned by stat + // documentation: https://pkg.go.dev/io/fs#WalkDirFunc if os.IsNotExist(err) { + // none of the files that were moved actually exist in local storage log.Info("FileCache::RenameDir : %s does not exist in local cache.", options.Src) } else if err != nil { log.Warn("FileCache::RenameDir : %s stat err [%v].", options.Src, err) @@ -631,6 +625,11 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { fc.policy.CachePurge(directoriesToPurge[i]) } + if fc.cacheTimeout == 0 { + // delete destination path immediately + fc.invalidateDirectory(options.Dst) + } + return nil } @@ -1321,35 +1320,19 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) { err := os.Rename(localSrcPath, localDstPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to rename local file %s [%s]", localSrcPath, err.Error()) - } - if err != nil { - // If there was a problem in local rename then delete the destination file - // it might happen that dest file was already there and local rename failed - // so deleting local dest file ensures next open of that will get the updated file from container - err = deleteFile(localDstPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localDstPath, err.Error()) - } - - fc.policy.CachePurge(localDstPath) - } - - err = deleteFile(localSrcPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localSrcPath, err.Error()) + // if rename fails, we just delete the source file anyway + log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", localSrcPath, localDstPath, err) + } else { + fc.policy.CacheValid(localDstPath) } - + // delete the source from our cache policy + // this will also delete the source file from local storage (if rename failed) fc.policy.CachePurge(localSrcPath) if fc.cacheTimeout == 0 { // Destination file needs to be deleted immediately fc.policy.CachePurge(localDstPath) - } else { - // Add destination file to cache, it will be removed on timeout - fc.policy.CacheValid(localDstPath) } } From 6f1b0159b333f686fc4cd398207a6aabfc9d17d5 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 12 Sep 2024 18:07:36 -0600 Subject: [PATCH 16/20] Do purges for timeout=0 asynchronously --- component/file_cache/file_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 7cbe1b58b..4e44292a1 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -627,7 +627,7 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { if fc.cacheTimeout == 0 { // delete destination path immediately - fc.invalidateDirectory(options.Dst) + go fc.invalidateDirectory(options.Dst) } return nil @@ -1332,7 +1332,7 @@ func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) if fc.cacheTimeout == 0 { // Destination file needs to be deleted immediately - fc.policy.CachePurge(localDstPath) + go fc.policy.CachePurge(localDstPath) } } From d1da913d3bb984ab2a04121a7cc2dc1a9b3bbde8 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 13 Sep 2024 12:51:09 -0600 Subject: [PATCH 17/20] Add more logging --- component/file_cache/file_cache.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 4e44292a1..0fbe011df 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -415,8 +415,8 @@ func (fc *FileCache) invalidateDirectory(name string) { var directoriesToPurge []string err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { if err == nil && d != nil { - log.Debug("FileCache::invalidateDirectory : %s getting removed from cache", path) if !d.IsDir() { + log.Debug("FileCache::invalidateDirectory : removing file %s from cache", path) fc.policy.CachePurge(path) } else { // remember to delete the directory later (after its children) @@ -436,6 +436,7 @@ func (fc *FileCache) invalidateDirectory(name string) { // clean up leftover source directories in reverse order for i := len(directoriesToPurge) - 1; i >= 0; i-- { + log.Debug("FileCache::invalidateDirectory : removing dir %s from cache", directoriesToPurge[i]) fc.policy.CachePurge(directoriesToPurge[i]) } @@ -594,10 +595,10 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { if err == nil && d != nil { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { - log.Debug("FileCache::RenameDir : Renaming local file %s -> %s...", path, newPath) + log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath) fc.renameCachedFile(path, newPath) } else { - log.Debug("FileCache::RenameDir : Renaming local directory %s -> %s...", path, newPath) + log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath) // create the new directory mkdirErr := os.MkdirAll(newPath, fc.defaultPermission) if mkdirErr != nil { @@ -622,11 +623,13 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { // clean up leftover source directories in reverse order for i := len(directoriesToPurge) - 1; i >= 0; i-- { + log.Debug("FileCache::RenameDir : Removing local directory %s", directoriesToPurge[i]) fc.policy.CachePurge(directoriesToPurge[i]) } if fc.cacheTimeout == 0 { // delete destination path immediately + log.Info("FileCache::RenameDir : Timeout is zero, so removing local destination %s", options.Dst) go fc.invalidateDirectory(options.Dst) } From 3f51cb83e9874b41608a3bb9d9853dcde7fda8e1 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 13 Sep 2024 13:56:07 -0600 Subject: [PATCH 18/20] Do not use unix path join on local paths --- component/file_cache/file_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 0fbe011df..65c41abb3 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -587,8 +587,8 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { } // move the files in local storage - localSrcPath := common.JoinUnixFilepath(fc.tmpPath, options.Src) - localDstPath := common.JoinUnixFilepath(fc.tmpPath, options.Dst) + localSrcPath := filepath.Join(fc.tmpPath, options.Src) + localDstPath := filepath.Join(fc.tmpPath, options.Dst) // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' var directoriesToPurge []string _ = filepath.WalkDir(localSrcPath, func(path string, d fs.DirEntry, err error) error { From d69cf7c94d1bcda7d7c5cf556e17433a26542207 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 18 Sep 2024 14:43:58 -0600 Subject: [PATCH 19/20] Revert trivial change to deleteFile --- component/file_cache/cache_policy.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index c468250ca..3fe656410 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -108,8 +108,7 @@ func deleteFile(name string) error { log.Err("cachePolicy::deleteFile : %s failed to reset permissions", name) return err } - // recurse so any further errors can be handled or returned - return deleteFile(name) + err = os.Remove(name) } else if err != nil && os.IsNotExist(err) { log.Debug("cachePolicy::deleteFile : %s does not exist in local cache", name) return nil From 967dd34e63c5eaa050e0a7883007d6319c774252 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 18 Sep 2024 14:44:43 -0600 Subject: [PATCH 20/20] fix diff --- component/file_cache/cache_policy.go | 1 + 1 file changed, 1 insertion(+) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index 3fe656410..58f5a736b 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -108,6 +108,7 @@ func deleteFile(name string) error { log.Err("cachePolicy::deleteFile : %s failed to reset permissions", name) return err } + err = os.Remove(name) } else if err != nil && os.IsNotExist(err) { log.Debug("cachePolicy::deleteFile : %s does not exist in local cache", name)