Skip to content

Commit

Permalink
[#2151][#2153] Stat cache file if it is open
Browse files Browse the repository at this point in the history
If there is an open cache file, stat it instead of doing a HEAD to S3.
Fix translation of open mode to posix stream.
  • Loading branch information
JustinKyleJames committed Dec 19, 2023
1 parent 9e16552 commit 19ecdba
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 4 deletions.
35 changes: 35 additions & 0 deletions packaging/resource_suite_s3_nocache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,41 @@ def test_istream_nonexistent_file__issue_6479(self):
'DATA_REPL_STATUS', '0'])
self.user0.run_icommand(['irm', '-f', logical_path])

def test_istream_append__issue_2153(self):
replica_number_in_s3 = 0
filename = 'test_istream_append__issue_2153.txt'
logical_path = os.path.join(self.user0.session_collection, filename)
content = 'this is the original file for istream'
content_append1 = '\nline1 appended to the file'
content_append2 = '\nline2 appended to the file'

try:
# istream to a new data object and ensure that it is created successfully.
self.user0.assert_icommand(['istream', 'write', logical_path], input=content)
self.assertEqual(str(1), lib.get_replica_status(self.user0, filename, replica_number_in_s3))

# append to the file
self.user0.assert_icommand(['istream', 'write', '-a', logical_path], input=f'{content_append1}')

# append to the file again
self.user0.assert_icommand(['istream', 'write', '-a', logical_path], input=f'{content_append2}')

# verify the file size and repl status in ils -l
self.user0.assert_icommand(['ils', '-l', logical_path], 'STDOUT', str(len(content) + len(content_append1) + len(content_append2)))
self.assertEqual(str(1), lib.get_replica_status(self.user0, filename, replica_number_in_s3))

# Ensure that the replica actually contains the contents streamed into it.
self.user0.assert_icommand(['istream', 'read', logical_path], 'STDOUT', [content, content_append1, content_append2])

finally:
# Set the replica status here so that we can remove the object even if it is stuck in the locked status.
print(self.user0.run_icommand(['ils', '-l'])[0]) # just debug
self.admin.run_icommand([
'iadmin', 'modrepl',
'logical_path', logical_path,
'replica_number', str(replica_number_in_s3),
'DATA_REPL_STATUS', '0'])
self.user0.run_icommand(['irm', '-f', logical_path])

def test_iput_with_invalid_secret_key_and_overwrite__issue_6154(self):
replica_number_in_s3 = 0
Expand Down
61 changes: 60 additions & 1 deletion s3/s3_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ namespace irods_s3 {
mode |= ios_base::in;
}

if ((oflag & O_TRUNC) || (oflag & O_CREAT)) {
if ((oflag & O_TRUNC)) {
mode |= ios_base::trunc;
}

Expand All @@ -450,6 +450,19 @@ namespace irods_s3 {
mode &= ~ios_base::trunc; // turn off trunc flag
}

logger::debug("{}:{} ({}) [[{}]] translated open mode is [app={}][binary={}][in={}][out={}][trunc={}][ate={}]",
__FILE__,
__LINE__,
__FUNCTION__,
thread_id,
(mode & std::ios::app) != 0,
(mode & std::ios::binary) != 0,
(mode & std::ios::in) != 0,
(mode & std::ios::out) != 0,
(mode & std::ios::trunc) != 0,
(mode & std::ios::ate) != 0
);

return mode;

}
Expand Down Expand Up @@ -1321,8 +1334,54 @@ namespace irods_s3 {
irods::plugin_context& _ctx,
struct stat* _statbuf )
{
if (!is_cacheless_mode(_ctx.prop_map())) {
return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false);
}

// cacheless mode
std::uint64_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
logger::debug("{}:{} ({}) [[{}]]", __FILE__, __LINE__, __FUNCTION__, thread_id);

// issue 2153 - Sometimes a stat is called before a close. In the case that we
// are in cacheless mode but using a local cache file, and that cache file has not yet
// been flushed, do a stat of that cache file instead of doing a HEAD to S3.
//
// We need the fd to get the transport object. Unfortunately for some reason file_obj->file_descriptor()
// is not set at this point so we will have to search through the L1desc table for
// the objPath and get the fd from that.
irods::file_object_ptr file_obj = boost::dynamic_pointer_cast<irods::file_object>(_ctx.fco());
int fd = 0;
for (int i = 0; i < NUM_L1_DESC; ++i) {
if (L1desc[i].inuseFlag && L1desc[i].dataObjInp && L1desc[i].dataObjInfo) {
if (L1desc[i].dataObjInp->objPath == file_obj->logical_path()) {
fd = i;
break;
}
}
}

if (fd_data.exists(fd)) {
per_thread_data data = fd_data.get(fd);
if (data.dstream_ptr && data.s3_transport_ptr && data.s3_transport_ptr->is_cache_file_open()) {

// do a stat on the cache file, populate stat_buf, and return
std::string cache_file_physical_path = data.s3_transport_ptr->get_cache_file_path();

const int status = stat(cache_file_physical_path.c_str(), _statbuf );

// return an error if necessary
if (status < 0) {
const int err_status = UNIX_FILE_STAT_ERR - errno;
return ERROR(err_status, fmt::format(
"Stat error for \"{}\", errno = \"{}\", status = {}.",
cache_file_physical_path.c_str(), strerror(errno), err_status));
}

return CODE(status);
}
}

// there is not an open cache file, do the normal HEAD to S3
return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false);
}

Expand Down
37 changes: 34 additions & 3 deletions s3/s3_transport/include/s3_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ namespace irods::experimental::io::s3_transport
last_file_to_close_, data.know_number_of_threads, data.threads_remaining_to_close);

// if a critical error occurred - do not flush cache file or complete multipart upload

if (!this->error_.ok()) {

return_value = false;
Expand Down Expand Up @@ -836,6 +835,30 @@ namespace irods::experimental::io::s3_transport
return existing_object_size_;
}

bool is_cache_file_open() {

if (!use_cache_) {
return false;
}

named_shared_memory_object shm_obj{shmem_key_,
config_.shared_memory_timeout_in_seconds,
constants::MAX_S3_SHMEM_SIZE};

return shm_obj.atomic_exec([](auto& data) -> bool
{
return !data.cache_file_flushed;
});

}

std::string get_cache_file_path() {
namespace bf = boost::filesystem;
bf::path cache_file = bf::path(config_.cache_directory) / bf::path(object_key_ + "-cache");
return cache_file.string();
}


private:

void set_file_offset(std::int64_t file_offset) {
Expand Down Expand Up @@ -1400,6 +1423,7 @@ namespace irods::experimental::io::s3_transport
}

if (object_status == object_s3_status::IN_S3 && this->download_to_cache_) {
logger::debug("{}:{} ({}) [[{}]] Downloading object to cache", __FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier());

cache_file_download_status download_status = this->download_object_to_cache(shm_obj, s3_object_size);

Expand Down Expand Up @@ -1458,8 +1482,15 @@ namespace irods::experimental::io::s3_transport
}

if (!cache_fstream_ || !cache_fstream_.is_open()) {
logger::error("{}:{} ({}) [[{}]] Failed to open cache file {}, error={}",
__FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier(), cache_file_path_.c_str(), strerror(errno));
logger::error("{}:{} ({}) [[{}]] Failed to open cache file {}, error={} open_mode: [app={}][binary={}][in={}][out={}][trunc={}][ate={}]",
__FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier(), cache_file_path_.c_str(), strerror(errno),
(mode & std::ios::app) != 0,
(mode & std::ios::binary) != 0,
(mode & std::ios::in) != 0,
(mode & std::ios::out) != 0,
(mode & std::ios::trunc) != 0,
(mode & std::ios::ate) != 0
);
this->set_error(ERROR(UNIX_FILE_OPEN_ERR, "Failed to open S3 cache file"));
return_value = false;
return;
Expand Down

0 comments on commit 19ecdba

Please sign in to comment.