Skip to content

Commit

Permalink
Merge branch 'main' into update-blobfuse2.3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
jfantinhardesty committed Sep 18, 2024
2 parents bbd6aa4 + cb2d856 commit b6119c7
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 133 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/code-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
BuildAndTest-Coverage:
strategy:
matrix:
go: ['1.22']
go: ['1.23']
job_name: ['linux']

include:
Expand Down Expand Up @@ -521,7 +521,7 @@ jobs:
BuildAndTest-Coverage-Windows:
strategy:
matrix:
go: ['1.22']
go: ['1.23']
job_name: ['windows']

include:
Expand Down Expand Up @@ -1005,7 +1005,7 @@ jobs:
- BuildAndTest-Coverage-Windows
strategy:
matrix:
go: ['1.22']
go: ['1.23']
job_name: ['linux']

include:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
if: matrix.language == 'go'
uses: actions/setup-go@v5
with:
go-version: '1.22'
go-version: '1.23'
check-latest: true

- name: Go Version
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
needs: compile-gui
runs-on: windows-latest
env:
go: '1.22'
go: '1.23'
cgo: '0'
winfsp: winfsp-2.0.23075.msi
steps:
Expand Down Expand Up @@ -209,7 +209,7 @@ jobs:
needs: create-installer
runs-on: ubuntu-latest
env:
go: '1.22'
go: '1.23'
zig: 0.13.0

steps:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
name: Build and Test on Linux
runs-on: ubuntu-latest
env:
go: '1.22'
go: '1.23'
cgo: ''
containerName: 'test-cnt-ubn'

Expand Down Expand Up @@ -117,7 +117,7 @@ jobs:
name: Build and Test on Windows
runs-on: windows-latest
env:
go: '1.22'
go: '1.23'
cgo: '0'
containerName: 'test-cnt-win'

Expand Down Expand Up @@ -149,7 +149,7 @@ jobs:
name: Lint
runs-on: ubuntu-latest
env:
go: '1.22'
go: '1.23'
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
171 changes: 107 additions & 64 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,31 +390,38 @@ func (fc *FileCache) invalidateDirectory(name string) {
log.Trace("FileCache::invalidateDirectory : %s", name)

localPath := filepath.Join(fc.tmpPath, name)
_, err := os.Stat(localPath)
if os.IsNotExist(err) {
log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name)
return
} else if err != nil {
log.Debug("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error())
return
}
// TODO : wouldn't this cause a race condition? a thread might get the lock before we purge - and the file would be non-existent
// WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file'
// Save the paths in lexical order and delete them in reverse order so folders are deleted after their children
var pathsToPurge []string
err = filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error {
var directoriesToPurge []string
err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error {
if err == nil && d != nil {
pathsToPurge = append(pathsToPurge, path)
if !d.IsDir() {
log.Debug("FileCache::invalidateDirectory : removing file %s from cache", path)
fc.policy.CachePurge(path)
} else {
// remember to delete the directory later (after its children)
directoriesToPurge = append(directoriesToPurge, path)
}
} else {
// stat(localPath) failed. err is the one returned by stat
// documentation: https://pkg.go.dev/io/fs#WalkDirFunc
if os.IsNotExist(err) {
log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name)
} else if err != nil {
log.Warn("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error())
}
}
return nil
})
for i := len(pathsToPurge) - 1; i >= 0; i-- {
log.Debug("FileCache::invalidateDirectory : %s getting removed from cache", pathsToPurge[i])
fc.policy.CachePurge(pathsToPurge[i])

// clean up leftover source directories in reverse order
for i := len(directoriesToPurge) - 1; i >= 0; i-- {
log.Debug("FileCache::invalidateDirectory : removing dir %s from cache", directoriesToPurge[i])
fc.policy.CachePurge(directoriesToPurge[i])
}

if err != nil {
log.Debug("FileCache::invalidateDirectory : Failed to iterate directory %s [%s].", localPath, err.Error())
log.Debug("FileCache::invalidateDirectory : Failed to walk directory %s. Here's why: %v", localPath, err)
return
}
}
Expand All @@ -434,7 +441,7 @@ func (fc *FileCache) DeleteDir(options internal.DeleteDirOptions) error {
// rest api delete will fail while we still need to cleanup the local cache for the same
}

go fc.invalidateDirectory(options.Name)
fc.invalidateDirectory(options.Name)
return err
}

Expand Down Expand Up @@ -549,7 +556,7 @@ func (fc *FileCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool {
return fc.NextComponent().IsDirEmpty(options)
}

// RenameDir: Recursively invalidate the source directory and its children
// RenameDir: Recursively move the source directory
func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst)

Expand All @@ -559,9 +566,53 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
return err
}

go fc.invalidateDirectory(options.Src)
// TLDR: Dst is guaranteed to be non-existent or empty.
// Note: We do not need to invalidate Dst due to the logic in our FUSE connector, see comments there.
// move the files in local storage
localSrcPath := filepath.Join(fc.tmpPath, options.Src)
localDstPath := filepath.Join(fc.tmpPath, options.Dst)
// WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file'
var directoriesToPurge []string
_ = filepath.WalkDir(localSrcPath, func(path string, d fs.DirEntry, err error) error {
if err == nil && d != nil {
newPath := strings.Replace(path, localSrcPath, localDstPath, 1)
if !d.IsDir() {
log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath)
fc.renameCachedFile(path, newPath)
} else {
log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath)
// create the new directory
mkdirErr := os.MkdirAll(newPath, fc.defaultPermission)
if mkdirErr != nil {
// log any error but do nothing about it
log.Warn("FileCache::RenameDir : Failed to created directory %s. Here's why: %v", newPath, mkdirErr)
}
// remember to delete the src directory later (after its contents are deleted)
directoriesToPurge = append(directoriesToPurge, path)
}
} else {
// stat(localPath) failed. err is the one returned by stat
// documentation: https://pkg.go.dev/io/fs#WalkDirFunc
if os.IsNotExist(err) {
// none of the files that were moved actually exist in local storage
log.Info("FileCache::RenameDir : %s does not exist in local cache.", options.Src)
} else if err != nil {
log.Warn("FileCache::RenameDir : %s stat err [%v].", options.Src, err)
}
}
return nil
})

// clean up leftover source directories in reverse order
for i := len(directoriesToPurge) - 1; i >= 0; i-- {
log.Debug("FileCache::RenameDir : Removing local directory %s", directoriesToPurge[i])
fc.policy.CachePurge(directoriesToPurge[i])
}

if fc.cacheTimeout == 0 {
// delete destination path immediately
log.Info("FileCache::RenameDir : Timeout is zero, so removing local destination %s", options.Dst)
go fc.invalidateDirectory(options.Dst)
}

return nil
}

Expand Down Expand Up @@ -812,9 +863,6 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error {
return err
}

// Increment the handle count in this lock item as there is one handle open for this now
flock.Inc()

inf, err := f.Stat()
if err == nil {
handle.Size = inf.Size()
Expand All @@ -838,6 +886,11 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error {
func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error) {
log.Trace("FileCache::OpenFile : name=%s, flags=%d, mode=%s", options.Name, options.Flags, options.Mode)

// get the file lock
flock := fc.fileLocks.Get(options.Name)
flock.Lock()
defer flock.Unlock()

attr, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: options.Name})

// return err in case of authorization permission mismatch
Expand All @@ -864,9 +917,11 @@ func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Hand
}
}

// create handle and set value
// create handle and record openFileOptions for later
handle := handlemap.NewHandle(options.Name)
handle.SetValue("openFileOptions", openFileOptions{flags: options.Flags, fMode: options.Mode})
// Increment the handle count in this lock item as there is one handle open for this now
flock.Inc()

return handle, nil
}
Expand Down Expand Up @@ -899,10 +954,7 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock
defer fc.fileCloseOpt.Done()

// if file has not been interactively read or written to by end user, then there is no cached file to close.
_, found := options.Handle.GetValue("openFileOptions")
if found {
return nil
}
_, noCachedHandle := options.Handle.GetValue("openFileOptions")

localPath := filepath.Join(fc.tmpPath, options.Handle.Path)

Expand All @@ -912,17 +964,20 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock
return err
}

f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::closeFileInternal : error [missing fd in handle object] %s", options.Handle.Path)
return syscall.EBADF
}
if !noCachedHandle {
f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::closeFileInternal : error [missing fd in handle object] %s", options.Handle.Path)
return syscall.EBADF
}

err = f.Close()
if err != nil {
log.Err("FileCache::closeFileInternal : error closing file %s(%d) [%s]", options.Handle.Path, int(f.Fd()), err.Error())
return err
err = f.Close()
if err != nil {
log.Err("FileCache::closeFileInternal : error closing file %s(%d) [%s]", options.Handle.Path, int(f.Fd()), err.Error())
return err
}
}

flock.Dec()

// If it is an fsync op then purge the file
Expand All @@ -939,7 +994,7 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock
return nil
}

fc.policy.CacheInvalidate(localPath) // Invalidate the file from the local cache.
fc.policy.CacheInvalidate(localPath) // Invalidate the file from the local cache if the timeout is zero.
return nil
}

Expand Down Expand Up @@ -1269,39 +1324,27 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error {
// if we do not perform rename operation locally and those destination files are cached then next time they are read
// we will be serving the wrong content (as we did not rename locally, we still be having older destination files with
// stale content). We either need to remove dest file as well from cache or just run rename to replace the content.
err = os.Rename(localSrcPath, localDstPath)
if err != nil && !os.IsNotExist(err) {
log.Err("FileCache::RenameFile : %s failed to rename local file %s [%s]", localSrcPath, err.Error())
}

if err != nil {
// If there was a problem in local rename then delete the destination file
// it might happen that dest file was already there and local rename failed
// so deleting local dest file ensures next open of that will get the updated file from container
err = deleteFile(localDstPath)
if err != nil && !os.IsNotExist(err) {
log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localDstPath, err.Error())
}
fc.renameCachedFile(localSrcPath, localDstPath)

fc.policy.CachePurge(localDstPath)
}
return nil
}

err = deleteFile(localSrcPath)
if err != nil && !os.IsNotExist(err) {
log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localSrcPath, err.Error())
func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) {
err := os.Rename(localSrcPath, localDstPath)
if err != nil {
// if rename fails, we just delete the source file anyway
log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", localSrcPath, localDstPath, err)
} else {
fc.policy.CacheValid(localDstPath)
}

// delete the source from our cache policy
// this will also delete the source file from local storage (if rename failed)
fc.policy.CachePurge(localSrcPath)

if fc.cacheTimeout == 0 {
// Destination file needs to be deleted immediately
fc.policy.CachePurge(localDstPath)
} else {
// Add destination file to cache, it will be removed on timeout
fc.policy.CacheValid(localDstPath)
go fc.policy.CachePurge(localDstPath)
}

return nil
}

// TruncateFile: Update the file with its new size.
Expand Down
Loading

0 comments on commit b6119c7

Please sign in to comment.