From 2ff4592ce491b28ac64e49ae2be253c94f2fa32c Mon Sep 17 00:00:00 2001 From: Justin James Date: Thu, 30 Nov 2023 17:35:15 -0500 Subject: [PATCH] [2151][2153] Stat cache file if it is open If there is an open cache file, stat it instead of doing a HEAD to S3. Fix translation of open mode to posix stream. --- packaging/resource_suite_s3_nocache.py | 33 +++++++++++ s3/s3_operations.cpp | 70 ++++++++++++++++++++++-- s3/s3_transport/include/s3_transport.hpp | 36 +++++++++++- 3 files changed, 132 insertions(+), 7 deletions(-) diff --git a/packaging/resource_suite_s3_nocache.py b/packaging/resource_suite_s3_nocache.py index 432ebb4f..3cdb0522 100644 --- a/packaging/resource_suite_s3_nocache.py +++ b/packaging/resource_suite_s3_nocache.py @@ -1910,6 +1910,39 @@ def test_istream_nonexistent_file__issue_6479(self): 'DATA_REPL_STATUS', '0']) self.user0.run_icommand(['irm', '-f', logical_path]) + def test_istream_append__issue_2153(self): + replica_number_in_s3 = 0 + filename = 'test_istream_append__issue_2153.txt' + logical_path = os.path.join(self.user0.session_collection, filename) + content = 'this is the original file for istream' + content_append1 = 'line1 appended to the file' + content_append2 = 'line2 appended to the file' + + try: + # istream to a new data object and ensure that it is created successfully. + self.user0.assert_icommand(['istream', 'write', logical_path], input=content) + self.assertEqual(str(1), lib.get_replica_status(self.user0, filename, replica_number_in_s3)) + + # append to the file + self.user0.assert_icommand(['istream', 'write', '-a', logical_path], input=f'\n{content_append1}') + + # append to the file again + self.user0.assert_icommand(['istream', 'write', '-a', logical_path], input=f'\n{content_append2}') + + # verify the file size in ils -l - added two newlines above thus the +2 below + self.user0.assert_icommand(['ils', '-l', logical_path], 'STDOUT', str(len(content) + len(content_append1) + len(content_append2)+ 2)) + + # Ensure that the replica actually contains the contents streamed into it. + self.user0.assert_icommand(['iget', logical_path, '-'], 'STDOUT_MULTILINE', [content, content_append1, content_append2]) + + finally: + # Set the replica status here so that we can remove the object even if it is stuck in the locked status. + self.admin.run_icommand([ + 'iadmin', 'modrepl', + 'logical_path', logical_path, + 'replica_number', str(replica_number_in_s3), + 'DATA_REPL_STATUS', '0']) + self.user0.run_icommand(['irm', '-f', logical_path]) def test_iput_with_invalid_secret_key_and_overwrite__issue_6154(self): replica_number_in_s3 = 0 diff --git a/s3/s3_operations.cpp b/s3/s3_operations.cpp index f5e6c5ba..7923fd8b 100644 --- a/s3/s3_operations.cpp +++ b/s3/s3_operations.cpp @@ -441,7 +441,7 @@ namespace irods_s3 { mode |= ios_base::in; } - if ((oflag & O_TRUNC) || (oflag & O_CREAT)) { + if ((oflag & O_TRUNC)) { mode |= ios_base::trunc; } @@ -450,6 +450,19 @@ namespace irods_s3 { mode &= ~ios_base::trunc; // turn off trunc flag } + logger::debug("{}:{} ({}) [[{}]] translated open mode is [app={}][binary={}][in={}][out={}][trunc={}][ate={}]", + __FILE__, + __LINE__, + __FUNCTION__, + thread_id, + (mode & std::ios::app) != 0, + (mode & std::ios::binary) != 0, + (mode & std::ios::in) != 0, + (mode & std::ios::out) != 0, + (mode & std::ios::trunc) != 0, + (mode & std::ios::ate) != 0 + ); + return mode; } @@ -1321,9 +1334,58 @@ namespace irods_s3 { irods::plugin_context& _ctx, struct stat* _statbuf ) { - std::uint64_t thread_id = std::hash{}(std::this_thread::get_id()); - logger::debug("{}:{} ({}) [[{}]]", __FILE__, __LINE__, __FUNCTION__, thread_id); - return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false); + if (is_cacheless_mode(_ctx.prop_map())) { + + std::uint64_t thread_id = std::hash{}(std::this_thread::get_id()); + logger::debug("{}:{} ({}) [[{}]]", __FILE__, __LINE__, __FUNCTION__, thread_id); + + // issue 2153 - Sometimes a stat is called before a close. In the case that we + // are in cacheless mode but using a local cache file, and that cache file has not yet + // been flushed, do a stat of that cache file instead of doing a HEAD to S3. + // + // We need the fd to get the transport object. Unfortunately for some reason file_obj->file_descriptor() + // is not set at this point so we will have to search through the L1desc table for + // the objPath and get the fd from that. + irods::file_object_ptr file_obj = boost::dynamic_pointer_cast(_ctx.fco()); + int fd = 0; + for (int i = 0; i < NUM_L1_DESC; ++i) { + if (L1desc[i].inuseFlag && L1desc[i].dataObjInp && L1desc[i].dataObjInfo) { + if (L1desc[i].dataObjInp->objPath == file_obj->logical_path()) { + fd = i; + break; + } + } + } + + if (fd_data.exists(fd)) { + per_thread_data data = fd_data.get(fd); + if (data.dstream_ptr && data.s3_transport_ptr && data.s3_transport_ptr->is_open_cache_file()) { + + // do a stat on the cache file, populate stat_buf, and return + std::string cache_file_physical_path = data.s3_transport_ptr->get_cache_file_path(); + + const int status = stat(cache_file_physical_path.c_str(), _statbuf ); + + // return an error if necessary + if (status < 0) { + const int err_status = UNIX_FILE_STAT_ERR - errno; + return ERROR(err_status, fmt::format( + "Stat error for \"{}\", errno = \"{}\", status = {}.", + cache_file_physical_path.c_str(), strerror(errno), err_status)); + } + + auto result = SUCCESS(); + result.code(status); + return result; + } + } + + // there is not an open cache file, do the normal HEAD to S3 + return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false); + } else { + return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false); + } + } // =-=-=-=-=-=-=- diff --git a/s3/s3_transport/include/s3_transport.hpp b/s3/s3_transport/include/s3_transport.hpp index ac28355f..2436caaf 100644 --- a/s3/s3_transport/include/s3_transport.hpp +++ b/s3/s3_transport/include/s3_transport.hpp @@ -449,7 +449,6 @@ namespace irods::experimental::io::s3_transport last_file_to_close_, data.know_number_of_threads, data.threads_remaining_to_close); // if a critical error occurred - do not flush cache file or complete multipart upload - if (!this->error_.ok()) { return_value = false; @@ -836,6 +835,29 @@ namespace irods::experimental::io::s3_transport return existing_object_size_; } + bool is_open_cache_file() { + if (use_cache_) { + + named_shared_memory_object shm_obj{shmem_key_, + config_.shared_memory_timeout_in_seconds, + constants::MAX_S3_SHMEM_SIZE}; + + return shm_obj.atomic_exec([](auto& data) -> bool + { + return !data.cache_file_flushed; + }); + + } + return false; + } + + std::string get_cache_file_path() { + namespace bf = boost::filesystem; + bf::path cache_file = bf::path(config_.cache_directory) / bf::path(object_key_ + "-cache"); + return cache_file.string(); + } + + private: void set_file_offset(std::int64_t file_offset) { @@ -1400,6 +1422,7 @@ namespace irods::experimental::io::s3_transport } if (object_status == object_s3_status::IN_S3 && this->download_to_cache_) { + logger::debug("{}:{} ({}) [[{}]] Downloading object to cache", __FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier()); cache_file_download_status download_status = this->download_object_to_cache(shm_obj, s3_object_size); @@ -1458,8 +1481,15 @@ namespace irods::experimental::io::s3_transport } if (!cache_fstream_ || !cache_fstream_.is_open()) { - logger::error("{}:{} ({}) [[{}]] Failed to open cache file {}, error={}", - __FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier(), cache_file_path_.c_str(), strerror(errno)); + logger::error("{}:{} ({}) [[{}]] Failed to open cache file {}, error={} open_mode: [app={}][binary={}][in={}][out={}][trunc={}][ate={}]", + __FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier(), cache_file_path_.c_str(), strerror(errno), + (mode & std::ios::app) != 0, + (mode & std::ios::binary) != 0, + (mode & std::ios::in) != 0, + (mode & std::ios::out) != 0, + (mode & std::ios::trunc) != 0, + (mode & std::ios::ate) != 0 + ); this->set_error(ERROR(UNIX_FILE_OPEN_ERR, "Failed to open S3 cache file")); return_value = false; return;