Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues 2151 2153 #2155

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packaging/resource_suite_s3_nocache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,41 @@ def test_istream_nonexistent_file__issue_6479(self):
'DATA_REPL_STATUS', '0'])
self.user0.run_icommand(['irm', '-f', logical_path])

def test_istream_append__issue_2153(self):
replica_number_in_s3 = 0
filename = 'test_istream_append__issue_2153.txt'
logical_path = os.path.join(self.user0.session_collection, filename)
content = 'this is the original file for istream'
content_append1 = '\nline1 appended to the file'
content_append2 = '\nline2 appended to the file'

try:
# istream to a new data object and ensure that it is created successfully.
self.user0.assert_icommand(['istream', 'write', logical_path], input=content)
self.assertEqual(str(1), lib.get_replica_status(self.user0, filename, replica_number_in_s3))

# append to the file
self.user0.assert_icommand(['istream', 'write', '-a', logical_path], input=f'{content_append1}')

# append to the file again
self.user0.assert_icommand(['istream', 'write', '-a', logical_path], input=f'{content_append2}')

# verify the file size and repl status in ils -l
self.user0.assert_icommand(['ils', '-l', logical_path], 'STDOUT', str(len(content) + len(content_append1) + len(content_append2)))
self.assertEqual(str(1), lib.get_replica_status(self.user0, filename, replica_number_in_s3))

# Ensure that the replica actually contains the contents streamed into it.
self.user0.assert_icommand(['istream', 'read', logical_path], 'STDOUT', [content, content_append1, content_append2])

finally:
JustinKyleJames marked this conversation as resolved.
Show resolved Hide resolved
# Set the replica status here so that we can remove the object even if it is stuck in the locked status.
print(self.user0.run_icommand(['ils', '-l'])[0]) # just debug
self.admin.run_icommand([
'iadmin', 'modrepl',
'logical_path', logical_path,
'replica_number', str(replica_number_in_s3),
'DATA_REPL_STATUS', '0'])
self.user0.run_icommand(['irm', '-f', logical_path])

def test_iput_with_invalid_secret_key_and_overwrite__issue_6154(self):
replica_number_in_s3 = 0
Expand Down
61 changes: 60 additions & 1 deletion s3/s3_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ namespace irods_s3 {
mode |= ios_base::in;
}

if ((oflag & O_TRUNC) || (oflag & O_CREAT)) {
if ((oflag & O_TRUNC)) {
mode |= ios_base::trunc;
}

Expand All @@ -450,6 +450,19 @@ namespace irods_s3 {
mode &= ~ios_base::trunc; // turn off trunc flag
}

logger::debug("{}:{} ({}) [[{}]] translated open mode is [app={}][binary={}][in={}][out={}][trunc={}][ate={}]",
__FILE__,
__LINE__,
__FUNCTION__,
thread_id,
(mode & std::ios::app) != 0,
(mode & std::ios::binary) != 0,
(mode & std::ios::in) != 0,
(mode & std::ios::out) != 0,
(mode & std::ios::trunc) != 0,
(mode & std::ios::ate) != 0
);

return mode;

}
Expand Down Expand Up @@ -1321,8 +1334,54 @@ namespace irods_s3 {
irods::plugin_context& _ctx,
struct stat* _statbuf )
{
if (!is_cacheless_mode(_ctx.prop_map())) {
return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false);
}

// cacheless mode
std::uint64_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
logger::debug("{}:{} ({}) [[{}]]", __FILE__, __LINE__, __FUNCTION__, thread_id);

// issue 2153 - Sometimes a stat is called before a close. In the case that we
// are in cacheless mode but using a local cache file, and that cache file has not yet
// been flushed, do a stat of that cache file instead of doing a HEAD to S3.
//
// We need the fd to get the transport object. Unfortunately for some reason file_obj->file_descriptor()
// is not set at this point so we will have to search through the L1desc table for
// the objPath and get the fd from that.
irods::file_object_ptr file_obj = boost::dynamic_pointer_cast<irods::file_object>(_ctx.fco());
int fd = 0;
for (int i = 0; i < NUM_L1_DESC; ++i) {
if (L1desc[i].inuseFlag && L1desc[i].dataObjInp && L1desc[i].dataObjInfo) {
if (L1desc[i].dataObjInp->objPath == file_obj->logical_path()) {
fd = i;
break;
}
}
}

if (fd_data.exists(fd)) {
per_thread_data data = fd_data.get(fd);
if (data.dstream_ptr && data.s3_transport_ptr && data.s3_transport_ptr->is_cache_file_open()) {

// do a stat on the cache file, populate stat_buf, and return
std::string cache_file_physical_path = data.s3_transport_ptr->get_cache_file_path();

const int status = stat(cache_file_physical_path.c_str(), _statbuf );

// return an error if necessary
if (status < 0) {
const int err_status = UNIX_FILE_STAT_ERR - errno;
return ERROR(err_status, fmt::format(
"Stat error for \"{}\", errno = \"{}\", status = {}.",
cache_file_physical_path.c_str(), strerror(errno), err_status));
}

return CODE(status);
}
}

// there is not an open cache file, do the normal HEAD to S3
return s3_file_stat_operation_with_flag_for_retry_on_not_found(_ctx, _statbuf, false);
}

Expand Down
37 changes: 34 additions & 3 deletions s3/s3_transport/include/s3_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ namespace irods::experimental::io::s3_transport
last_file_to_close_, data.know_number_of_threads, data.threads_remaining_to_close);

// if a critical error occurred - do not flush cache file or complete multipart upload

if (!this->error_.ok()) {

return_value = false;
Expand Down Expand Up @@ -836,6 +835,30 @@ namespace irods::experimental::io::s3_transport
return existing_object_size_;
}

bool is_cache_file_open() {

if (!use_cache_) {
return false;
}

named_shared_memory_object shm_obj{shmem_key_,
config_.shared_memory_timeout_in_seconds,
constants::MAX_S3_SHMEM_SIZE};

return shm_obj.atomic_exec([](auto& data) -> bool
{
return !data.cache_file_flushed;
});

}

std::string get_cache_file_path() {
namespace bf = boost::filesystem;
bf::path cache_file = bf::path(config_.cache_directory) / bf::path(object_key_ + "-cache");
return cache_file.string();
}


private:

void set_file_offset(std::int64_t file_offset) {
Expand Down Expand Up @@ -1400,6 +1423,7 @@ namespace irods::experimental::io::s3_transport
}

if (object_status == object_s3_status::IN_S3 && this->download_to_cache_) {
logger::debug("{}:{} ({}) [[{}]] Downloading object to cache", __FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier());

cache_file_download_status download_status = this->download_object_to_cache(shm_obj, s3_object_size);

Expand Down Expand Up @@ -1458,8 +1482,15 @@ namespace irods::experimental::io::s3_transport
}

if (!cache_fstream_ || !cache_fstream_.is_open()) {
logger::error("{}:{} ({}) [[{}]] Failed to open cache file {}, error={}",
__FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier(), cache_file_path_.c_str(), strerror(errno));
logger::error("{}:{} ({}) [[{}]] Failed to open cache file {}, error={} open_mode: [app={}][binary={}][in={}][out={}][trunc={}][ate={}]",
__FILE__, __LINE__, __FUNCTION__, this->get_thread_identifier(), cache_file_path_.c_str(), strerror(errno),
(mode & std::ios::app) != 0,
(mode & std::ios::binary) != 0,
(mode & std::ios::in) != 0,
(mode & std::ios::out) != 0,
(mode & std::ios::trunc) != 0,
(mode & std::ios::ate) != 0
);
this->set_error(ERROR(UNIX_FILE_OPEN_ERR, "Failed to open S3 cache file"));
return_value = false;
return;
Expand Down
Loading