Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make OpenFile prevent file eviction #323

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,6 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error {
return err
}

// Increment the handle count in this lock item as there is one handle open for this now
flock.Inc()

inf, err := f.Stat()
if err == nil {
handle.Size = inf.Size()
Expand All @@ -858,6 +855,11 @@ func (fc *FileCache) downloadFile(handle *handlemap.Handle) error {
func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Handle, error) {
log.Trace("FileCache::OpenFile : name=%s, flags=%d, mode=%s", options.Name, options.Flags, options.Mode)

// get the file lock
flock := fc.fileLocks.Get(options.Name)
flock.Lock()
defer flock.Unlock()

attr, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: options.Name})

// return err in case of authorization permission mismatch
Expand All @@ -884,9 +886,11 @@ func (fc *FileCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Hand
}
}

// create handle and set value
// create handle and record openFileOptions for later
handle := handlemap.NewHandle(options.Name)
handle.SetValue("openFileOptions", openFileOptions{flags: options.Flags, fMode: options.Mode})
// Increment the handle count in this lock item as there is one handle open for this now
flock.Inc()

return handle, nil
}
Expand Down Expand Up @@ -919,10 +923,7 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock
defer fc.fileCloseOpt.Done()

// if file has not been interactively read or written to by end user, then there is no cached file to close.
_, found := options.Handle.GetValue("openFileOptions")
if found {
return nil
}
_, noCachedHandle := options.Handle.GetValue("openFileOptions")

localPath := filepath.Join(fc.tmpPath, options.Handle.Path)

Expand All @@ -932,17 +933,20 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock
return err
}

f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::closeFileInternal : error [missing fd in handle object] %s", options.Handle.Path)
return syscall.EBADF
}
if !noCachedHandle {
f := options.Handle.GetFileObject()
if f == nil {
log.Err("FileCache::closeFileInternal : error [missing fd in handle object] %s", options.Handle.Path)
return syscall.EBADF
}

err = f.Close()
if err != nil {
log.Err("FileCache::closeFileInternal : error closing file %s(%d) [%s]", options.Handle.Path, int(f.Fd()), err.Error())
return err
err = f.Close()
if err != nil {
log.Err("FileCache::closeFileInternal : error closing file %s(%d) [%s]", options.Handle.Path, int(f.Fd()), err.Error())
return err
}
}

flock.Dec()

// If it is an fsync op then purge the file
Expand All @@ -959,7 +963,7 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock
return nil
}

fc.policy.CacheInvalidate(localPath) // Invalidate the file from the local cache.
fc.policy.CacheInvalidate(localPath) // Invalidate the file from the local cache if the timeout is zero.
return nil
}

Expand Down
63 changes: 55 additions & 8 deletions component/file_cache/file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,60 @@ func (suite *fileCacheTestSuite) TestCloseFileTimeout() {
suite.assert.True(err == nil || os.IsExist(err))
}

func (suite *fileCacheTestSuite) TestOpenCloseHandleCount() {
defer suite.cleanupTest()
// Setup
file := "file11"
handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777})
suite.assert.NoError(err)
err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle})
suite.assert.NoError(err)

handle, err = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: file, Mode: 0777})
suite.assert.NoError(err)
err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle})
suite.assert.NoError(err)

// check that flock handle count is correct
flock := suite.fileCache.fileLocks.Get(file)
suite.assert.Zero(flock.Count())
}

func (suite *fileCacheTestSuite) TestOpenPreventsEviction() {
defer suite.cleanupTest()
// Setup
suite.cleanupTest() // teardown the default file cache generated
cacheTimeout := 1
config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s",
suite.cache_path, cacheTimeout, suite.fake_storage_path)
suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual)

path := "file12"

handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777})
suite.assert.NoError(err)
err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle})
suite.assert.NoError(err)
// File should be in cache and cloud storage
suite.assert.FileExists(filepath.Join(suite.cache_path, path))
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path))

// Open file (this should prevent eviction)
handle, err = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Mode: 0777})
suite.assert.NoError(err)

// wait until file would be evicted (if not for being opened)
time.Sleep(time.Second * time.Duration(cacheTimeout*3))

// File should still be in cache
suite.assert.FileExists(filepath.Join(suite.cache_path, path))
suite.assert.True(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, path)))

// cleanup
err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle})
suite.assert.NoError(err)
}

func (suite *fileCacheTestSuite) TestReadInBufferEmpty() {
defer suite.cleanupTest()
// Setup
Expand Down Expand Up @@ -1169,8 +1223,6 @@ func (suite *fileCacheTestSuite) TestRenameFileInCache() {
suite.assert.NoError(err)
openHandle, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: src, Mode: 0666})
suite.assert.NoError(err)
err = suite.fileCache.downloadFile(openHandle)
suite.assert.NoError(err)

// Path should be in the file cache
_, err = os.Stat(filepath.Join(suite.cache_path, src))
Expand Down Expand Up @@ -1275,11 +1327,9 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanupWithNoTimeout() {
createHandle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666})
suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: createHandle})
openHandle, _ := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: src, Mode: 0666})
err := suite.fileCache.downloadFile(openHandle)
suite.assert.NoError(err)

// Path should be in the file cache
_, err = os.Stat(suite.cache_path + "/" + src)
_, err := os.Stat(suite.cache_path + "/" + src)
suite.assert.True(err == nil || os.IsExist(err))
// Path should be in fake storage
_, err = os.Stat(suite.fake_storage_path + "/" + src)
Expand Down Expand Up @@ -1428,9 +1478,6 @@ func (suite *fileCacheTestSuite) TestCachePathSymlink() {

handle, _ = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: file, Mode: 0777})

err = suite.fileCache.downloadFile(handle)
suite.assert.NoError(err)

output := make([]byte, 9)
n, err := suite.fileCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: handle, Offset: 0, Data: output})
suite.assert.NoError(err)
Expand Down
Loading