From afc64c1082459c495a9120360b73ecebf3d5b902 Mon Sep 17 00:00:00 2001 From: Justin James Date: Wed, 10 Jul 2024 11:22:33 -0400 Subject: [PATCH] [2205] Use shared memory to share number of upload threads --- .../s3_resource/multipart_shared_data.hpp | 45 ++ s3_resource/src/s3_operations.cpp | 419 +++++++++++------- .../managed_shared_memory_object.hpp | 13 +- .../s3_transport/multipart_shared_data.hpp | 21 +- .../irods/private/s3_transport/util.hpp | 10 +- 5 files changed, 322 insertions(+), 186 deletions(-) create mode 100644 s3_resource/include/irods/private/s3_resource/multipart_shared_data.hpp diff --git a/s3_resource/include/irods/private/s3_resource/multipart_shared_data.hpp b/s3_resource/include/irods/private/s3_resource/multipart_shared_data.hpp new file mode 100644 index 0000000..1c286a9 --- /dev/null +++ b/s3_resource/include/irods/private/s3_resource/multipart_shared_data.hpp @@ -0,0 +1,45 @@ +#ifndef IRODS_S3_RESOURCE_MULTIPART_SHARED_DATA_HPP +#define IRODS_S3_RESOURCE_MULTIPART_SHARED_DATA_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace irods_s3 +{ + namespace interprocess_types + { + namespace bi = boost::interprocess; + using segment_manager = bi::managed_shared_memory::segment_manager; + using void_allocator = boost::container::scoped_allocator_adaptor >; + } + + // data that needs to be shared among different processes + struct multipart_shared_data + { + explicit multipart_shared_data(const interprocess_types::void_allocator &allocator) + : threads_remaining_to_close{0} + , number_of_threads{0} + , ref_count{0} + {} + + bool can_delete() { + return threads_remaining_to_close == 0; + } + + int threads_remaining_to_close; + int number_of_threads; + int ref_count; + }; // struct multipart_shared_data +} // namespace irods_s3 + +#endif // IRODS_S3_RESOURCE_MULTIPART_SHARED_DATA_HPP diff --git a/s3_resource/src/s3_operations.cpp b/s3_resource/src/s3_operations.cpp index 460cc76..6fced55 100644 --- a/s3_resource/src/s3_operations.cpp +++ b/s3_resource/src/s3_operations.cpp @@ -3,7 +3,9 @@ #include "irods/private/s3_resource/s3_operations.hpp" #include "irods/private/s3_resource/s3_resource.hpp" #include "irods/private/s3_transport/s3_transport.hpp" +#include "irods/private/s3_transport/managed_shared_memory_object.hpp" #include "irods/private/s3_resource/s3_plugin_logging_category.hpp" +#include "irods/private/s3_resource/multipart_shared_data.hpp" // =-=-=-=-=-=-=- // irods includes @@ -68,6 +70,15 @@ using s3_transport_config = irods::experimental::io::s3_transport::config; namespace irods_s3 { + inline static const std::string SHARED_MEMORY_KEY_PREFIX{"irods_s3-shm-"}; + inline static constexpr int DEFAULT_SHARED_MEMORY_TIMEOUT_IN_SECONDS{180}; + + // See https://groups.google.com/g/boost-list/c/5ADnEPYg-ho for an explanation + // of why the 100*sizeof(void*) is used below. Essentially, the shared memory + // must have enough space for the memory algorithm and reserved area but there is + // no way of knowing the size for these. It is stated that 100*sizeof(void*) would + // be enough. + inline static constexpr std::int64_t SHMEM_SIZE{100*sizeof(void*) + sizeof(multipart_shared_data) }; namespace log = irods::experimental::log; using logger = log::logger; @@ -160,6 +171,11 @@ namespace irods_s3 { return false; } // end operation_requires_that_object_exists + std::string get_shmem_key(irods::plugin_context& ctx, irods::file_object_ptr file_obj) { + return SHARED_MEMORY_KEY_PREFIX + + std::to_string(std::hash{}(get_resource_name(ctx.prop_map()) + file_obj->logical_path())); + } + irods::error s3_file_stat_operation_with_flag_for_retry_on_not_found(irods::plugin_context& _ctx, struct stat* _statbuf, bool retry_on_not_found ); @@ -171,195 +187,225 @@ namespace irods_s3 { bool query_metadata = true) -> irods::error { using logger_config = irods::experimental::log::logger_config; + using named_shared_memory_object = + irods::experimental::interprocess::shared_memory::named_shared_memory_object + ; std::uint64_t thread_id = std::hash{}(std::this_thread::get_id()); - // ********* DEBUG - print L1desc for all - if (logger_config::get_level() == log::level::debug || logger_config::get_level() == log::level::trace) { + irods::file_object_ptr file_obj = boost::dynamic_pointer_cast(_ctx.fco()); - logger::debug("{}:{} ({}) [[{}]] ------------- L1desc ---------------", - __FILE__, __LINE__, __FUNCTION__, thread_id); - for (int i = 0; i < NUM_L1_DESC; ++i) { - if (L1desc[i].inuseFlag && L1desc[i].dataObjInp && L1desc[i].dataObjInfo) { - int thread_count = L1desc[i].dataObjInp->numThreads; - int oprType = L1desc[i].dataObjInp->oprType; - std::int64_t data_size = L1desc[i].dataSize; - logger::debug("{}:{} ({}) [[{}]] [{}][objPath={}][filePath={}][oprType={}]" - "[requested_number_of_threads={}][dataSize={}][dataObjInfo->dataSize={}][srcL1descInx={}]", - __FILE__, __LINE__, __FUNCTION__, thread_id, i, L1desc[i].dataObjInp->objPath, - L1desc[i].dataObjInfo->filePath, oprType, thread_count, data_size, - L1desc[i].dataObjInfo->dataSize, L1desc[i].srcL1descInx); + // Open shared memory and see if we know the number of threads from another thread + std::string shmem_key = get_shmem_key(_ctx, file_obj); + logger::trace("{}:{} ({}) [[{}]] shmem_key={} hashed_string={}", __FILE__, __LINE__, __FUNCTION__, thread_id, shmem_key, get_resource_name(_ctx.prop_map()) + file_obj->logical_path() ); + + named_shared_memory_object shm_obj{shmem_key, + DEFAULT_SHARED_MEMORY_TIMEOUT_IN_SECONDS, + SHMEM_SIZE}; + + // wrapping this in an atomic_exec so only one thread/process for a specific data object is executed at a time + std::string func(__func__); + auto ret_value = shm_obj.atomic_exec([&number_of_threads, &data_size, &oprType, &_ctx, thread_id, file_obj, func](auto& data) { + + oprType = -1; + int requested_number_of_threads = 0; + + // ********* DEBUG - print L1desc for all + if (logger_config::get_level() == log::level::debug || logger_config::get_level() == log::level::trace) { + + logger::debug("{}:{} ({}) [[{}]] ------------- L1desc ---------------", + __FILE__, __LINE__, func, thread_id); + for (int i = 0; i < NUM_L1_DESC; ++i) { + if (L1desc[i].inuseFlag && L1desc[i].dataObjInp && L1desc[i].dataObjInfo) { + int thread_count = L1desc[i].dataObjInp->numThreads; + int oprType = L1desc[i].dataObjInp->oprType; + std::int64_t data_size = L1desc[i].dataSize; + logger::debug("{}:{} ({}) [[{}]] [{}][objPath={}][filePath={}][oprType={}]" + "[requested_number_of_threads={}][dataSize={}][dataObjInfo->dataSize={}][srcL1descInx={}]", + __FILE__, __LINE__, func, thread_id, i, L1desc[i].dataObjInp->objPath, + L1desc[i].dataObjInfo->filePath, oprType, thread_count, data_size, + L1desc[i].dataObjInfo->dataSize, L1desc[i].srcL1descInx); + } } + logger::debug("{}:{} ({}) [[{}]] ------------------------------------", + __FILE__, __LINE__, func, thread_id); } - logger::debug("{}:{} ({}) [[{}]] ------------------------------------", - __FILE__, __LINE__, __FUNCTION__, thread_id); - } - // ********* END DEBUG - print L1desc for all + // ********* END DEBUG - print L1desc for all - oprType = -1; - int requested_number_of_threads = 0; - irods::file_object_ptr file_obj = boost::dynamic_pointer_cast(_ctx.fco()); + if (data.number_of_threads > 0) { + number_of_threads = data.number_of_threads; + } + logger::debug("{}:{} ({}) [[{}]] number_of_threads in shmem = {}", __FILE__, __LINE__, func, thread_id, data.number_of_threads); - // get data size stored earlier in s3_resolve_resc_hier_operation - // brackets reduce scope of lock_guard - { - std::lock_guard lock(global_mutex); - data_size = irods_s3::data_size; - number_of_threads = irods_s3::number_of_threads; - oprType = irods_s3::oprType; - } - - // if data size is still unknown, try to get if from DATA_SIZE_KW - if (data_size == s3_transport_config::UNKNOWN_OBJECT_SIZE) { - char *data_size_str = getValByKey(&file_obj->cond_input(), DATA_SIZE_KW); - logger::debug("{}:{} ({}) [[{}]] data_size_str = {}", __FILE__, __LINE__, __FUNCTION__, thread_id, fmt::ptr(data_size_str)); - if (data_size_str) { - - logger::debug("{}:{} ({}) [[{}]] read DATA_SIZE_KW of {}", - __FILE__, __LINE__, __FUNCTION__, thread_id, data_size_str); - - try { - data_size = boost::lexical_cast(data_size_str); - } catch (boost::bad_lexical_cast const& e) { - data_size = s3_transport_config::UNKNOWN_OBJECT_SIZE; - logger::warn("{}:{} ({}) [[{}]] DATA_SIZE_KW ({}) could not be parsed as std::size_t", - __FILE__, __LINE__, __FUNCTION__, thread_id, data_size_str); + // get data size stored earlier in s3_resolve_resc_hier_operation + // brackets reduce scope of lock_guard + { + std::lock_guard lock(global_mutex); + data_size = irods_s3::data_size; + oprType = irods_s3::oprType; + } + + // if data size is still unknown, try to get if from DATA_SIZE_KW + if (data_size == s3_transport_config::UNKNOWN_OBJECT_SIZE) { + char *data_size_str = getValByKey(&file_obj->cond_input(), DATA_SIZE_KW); + logger::debug("{}:{} ({}) [[{}]] data_size_str = {}", __FILE__, __LINE__, func, thread_id, fmt::ptr(data_size_str)); + if (data_size_str) { + + logger::debug("{}:{} ({}) [[{}]] read DATA_SIZE_KW of {}", + __FILE__, __LINE__, func, thread_id, data_size_str); + + try { + data_size = boost::lexical_cast(data_size_str); + } catch (boost::bad_lexical_cast const& e) { + data_size = s3_transport_config::UNKNOWN_OBJECT_SIZE; + logger::warn("{}:{} ({}) [[{}]] DATA_SIZE_KW ({}) could not be parsed as std::size_t", + __FILE__, __LINE__, func, thread_id, data_size_str); + } } } - } - // first try to get requested number of threads, data size, and oprType from L1desc - // Note: On a replication from an s3 src within a replication node, there are two entries for the - // replica - one for PUT and one for REPL_DEST. During the initial PUT there is only one - // entry. To see of we are doing the PUT or REPL, look for the last entry on the list. - bool found = false; - for (int i = 0; i < NUM_L1_DESC; ++i) { - if (L1desc[i].inuseFlag) { - if (L1desc[i].dataObjInp && L1desc[i].dataObjInfo && - L1desc[i].dataObjInp->objPath == file_obj->logical_path() - && L1desc[i].dataObjInfo->filePath == file_obj->physical_path()) { - - found = true; - requested_number_of_threads = L1desc[i].dataObjInp->numThreads; - oprType = L1desc[i].dataObjInp->oprType; - - // if data_size is zero or UNKNOWN, try to get it from L1desc - if (data_size == s3_transport_config::UNKNOWN_OBJECT_SIZE) { - data_size = L1desc[i].dataSize; + // first try to get requested number of threads, data size, and oprType from L1desc + // Note: On a replication from an s3 src within a replication node, there are two entries for the + // replica - one for PUT and one for REPL_DEST. During the initial PUT there is only one + // entry. To see if we are doing the PUT or REPL, look for the last entry on the list. + bool found = false; + for (int i = 0; i < NUM_L1_DESC; ++i) { + if (L1desc[i].inuseFlag) { + if (L1desc[i].dataObjInp && L1desc[i].dataObjInfo && + L1desc[i].dataObjInp->objPath == file_obj->logical_path() + && L1desc[i].dataObjInfo->filePath == file_obj->physical_path()) { + + found = true; + requested_number_of_threads = L1desc[i].dataObjInp->numThreads; + oprType = L1desc[i].dataObjInp->oprType; + + // if data_size is zero or UNKNOWN, try to get it from L1desc + if (data_size == s3_transport_config::UNKNOWN_OBJECT_SIZE) { + data_size = L1desc[i].dataSize; + } } + } else if (found) { + break; } - } else if (found) { - break; } - } - // special treatment for replication - // 1) data_size is only available from the REPLICATE_SRC entry so use that. - // 2) number_of_threads is available in REPLICATE_DEST entry so use that. - if (oprType == REPLICATE_DEST) { - bool found_data_size = false; - bool found_number_of_threads = false; + // special treatment for replication + // 1) data_size is only available from the REPLICATE_SRC entry so use that. + // 2) number_of_threads is available in REPLICATE_DEST entry so use that. + if (oprType == REPLICATE_DEST) { + bool found_data_size = false; + bool found_number_of_threads = (number_of_threads > 0); - for (int i = 0; i < NUM_L1_DESC; ++i) { - const auto& l1d = L1desc[i]; - const auto* dobj_input = l1d.dataObjInp; - const auto* dobj_info = l1d.dataObjInfo; + for (int i = 0; i < NUM_L1_DESC; ++i) { + const auto& l1d = L1desc[i]; + const auto* dobj_input = l1d.dataObjInp; + const auto* dobj_info = l1d.dataObjInfo; - if (!l1d.inuseFlag || !dobj_input || dobj_input->objPath != file_obj->logical_path()) { - continue; - } + if (!l1d.inuseFlag || !dobj_input || dobj_input->objPath != file_obj->logical_path()) { + continue; + } - // get the data size from source dataObjInfo - if (!found_data_size && dobj_info && dobj_input->oprType == REPLICATE_SRC) { - data_size = dobj_info->dataSize; - logger::debug("{}:{} ({}) [[{}]] repl to s3 destination. setting data_size to {}", - __FILE__, __LINE__, __FUNCTION__, thread_id, data_size); + // get the data size from source dataObjInfo + if (!found_data_size && dobj_info && dobj_input->oprType == REPLICATE_SRC) { + data_size = dobj_info->dataSize; + logger::debug("{}:{} ({}) [[{}]] repl to s3 destination. setting data_size to {}", + __FILE__, __LINE__, func, thread_id, data_size); - found_data_size = true; - } + found_data_size = true; + } + + // get the number_of_threads from destination dataObjInp + if (!found_number_of_threads && dobj_input->oprType == REPLICATE_DEST) { + number_of_threads = dobj_input->numThreads; + logger::debug("{}:{} ({}) [[{}]] repl to s3 destination. setting number_of_threads to {}", + __FILE__, + __LINE__, + func, + thread_id, + number_of_threads); + + found_number_of_threads = true; + } - // get the number_of_threads from destination dataObjInp - if (!found_number_of_threads && dobj_input->oprType == REPLICATE_DEST) { - number_of_threads = dobj_input->numThreads; - logger::debug("{}:{} ({}) [[{}]] repl to s3 destination. setting number_of_threads to {}", - __FILE__, - __LINE__, - __FUNCTION__, - thread_id, - number_of_threads); - - found_number_of_threads = true; + // once we have both pieces of information break out of for loop + if (found_data_size && found_number_of_threads) { + break; + } } - // once we have both pieces of information break out of for loop - if (found_data_size && found_number_of_threads) { - break; + if (!found_number_of_threads) { + return ERROR(SYS_INTERNAL_ERR, + "Replicating from source to destination but was not able to find the replication " + "destination in L1desc table."); } } - if (!found_number_of_threads) { - return ERROR(SYS_INTERNAL_ERR, - "Replicating from source to destination but was not able to find the replication " - "destination in L1desc table."); - } - } + // if number_of_threads is still unknown, first try readng from NUM_THREADS_KW + if (number_of_threads <= 0) { + + // try to get number of threads from NUM_THREADS_KW + char *num_threads_str = getValByKey(&file_obj->cond_input(), NUM_THREADS_KW); + logger::debug("{}:{} ({}) [[{}]] num_threads_str = {}", __FILE__, __LINE__, __FUNCTION__, thread_id, fmt::ptr(num_threads_str)); + + if (num_threads_str) { + logger::debug("{}:{} ({}) [[{}]] num_threads_str = {}", + __FILE__, __LINE__, func, thread_id, num_threads_str); + try { + number_of_threads = boost::lexical_cast(num_threads_str); + } catch (const boost::bad_lexical_cast &) { + number_of_threads = 0; + logger::warn("{}:{} ({}) [[{}]] NUM_THREADS_KW ({}) could not be parsed as int", + __FILE__, __LINE__, func, thread_id, num_threads_str); + } + } - // if number_of_threads is still unknown, first try readng from NUM_THREADS_KW - if (number_of_threads <= 0) { + // If number of threads was not successfully set above. + if (number_of_threads == 0) { + const int single_buff_sz = irods::get_advanced_setting(irods::KW_CFG_MAX_SIZE_FOR_SINGLE_BUFFER) * 1024 * 1024; + + if (data_size > single_buff_sz && oprType != REPLICATE_DEST) { + number_of_threads = getNumThreads(_ctx.comm(), + data_size, + requested_number_of_threads, + const_cast(&file_obj->cond_input()), + nullptr, // destination resc hier + nullptr, // source resc hier + 0); // opr type - not used + } + } + + // If we still don't know the # of threads, set it to 1 unless the oprType is unknown in + // which case it will remain <= 0 which will force use of cache. + if (number_of_threads <= 0 && oprType != -1) { + number_of_threads = 1; + } + } - // try to get number of threads from NUM_THREADS_KW - char *num_threads_str = getValByKey(&file_obj->cond_input(), NUM_THREADS_KW); - logger::debug("{}:{} ({}) [[{}]] num_threads_str = {}", __FILE__, __LINE__, __FUNCTION__, thread_id, fmt::ptr(num_threads_str)); + logger::debug("{}:{} ({}) [[{}]] number_of_threads set to {}", __FILE__, __LINE__, func, thread_id, number_of_threads); - if (num_threads_str) { - logger::debug("{}:{} ({}) [[{}]] num_threads_str = {}", - __FILE__, __LINE__, __FUNCTION__, thread_id, num_threads_str); - try { - number_of_threads = boost::lexical_cast(num_threads_str); - // save the number of threads - { - std::lock_guard lock(global_mutex); - irods_s3::number_of_threads = number_of_threads; - } - } catch (const boost::bad_lexical_cast &) { - number_of_threads = 0; - logger::warn("{}:{} ({}) [[{}]] NUM_THREADS_KW ({}) could not be parsed as int", - __FILE__, __LINE__, __FUNCTION__, thread_id, num_threads_str); - } + // save the number of threads and data_size + { + std::lock_guard lock(global_mutex); + irods_s3::data_size = data_size; + irods_s3::oprType = oprType; } - // If number of threads was not successfully set above. - if (number_of_threads == 0) { - const int single_buff_sz = irods::get_advanced_setting(irods::KW_CFG_MAX_SIZE_FOR_SINGLE_BUFFER) * 1024 * 1024; - - if (data_size > single_buff_sz && oprType != REPLICATE_DEST) { - number_of_threads = getNumThreads(_ctx.comm(), - data_size, - requested_number_of_threads, - const_cast(&file_obj->cond_input()), - nullptr, // destination resc hier - nullptr, // source resc hier - 0); // opr type - not used - } - } + data.number_of_threads = number_of_threads; - // If we still don't know the # of threads, set it to 1 unless the oprType is unknown in - // which case it will remain <= 0 which will force use of cache. - if (number_of_threads <= 0 && oprType != -1) { - number_of_threads = 1; + if (data.threads_remaining_to_close <= 0) { + data.threads_remaining_to_close = number_of_threads; } - } - logger::debug("{}:{} ({}) [[{}]] number_of_threads set to {}", __FILE__, __LINE__, __FUNCTION__, thread_id, number_of_threads); + // If this is GET_OPR, we do not need the shared memory. Set the threads_remaining_to_close to 0 so the shmem will be + // deleted immediately. Note that for GET_OPR we don't necessarily know the number of threads (nor do we need it) and + // this makes it hard to determine when the shared memory can be deleted. + if (oprType == GET_OPR) { + data.threads_remaining_to_close = 0; + } - // save the number of threads and data_size - { - std::lock_guard lock(global_mutex); - irods_s3::number_of_threads = number_of_threads; - irods_s3::data_size = data_size; - irods_s3::oprType = oprType; - } + return SUCCESS(); + }); - return SUCCESS(); + return ret_value; } // update and return the physical path in case of decoupled naming @@ -946,6 +992,10 @@ namespace irods_s3 { if (is_cacheless_mode(_ctx.prop_map())) { + using named_shared_memory_object = + irods::experimental::interprocess::shared_memory::named_shared_memory_object + ; + std::uint64_t thread_id = std::hash{}(std::this_thread::get_id()); logger::debug("{}:{} ({}) [[{}]]", __FILE__, __LINE__, __FUNCTION__, thread_id); @@ -964,10 +1014,26 @@ namespace irods_s3 { std::uint64_t data_size = 0; int number_of_threads; + + // Open shared memory and get the number_of_threads + irods::file_object_ptr file_obj = boost::dynamic_pointer_cast(_ctx.fco()); + std::string shmem_key = get_shmem_key(_ctx, file_obj); + logger::trace("{}:{} ({}) [[{}]] shmem_key={} hashed_string={}", __FILE__, __LINE__, __FUNCTION__, thread_id, shmem_key, get_resource_name(_ctx.prop_map()) + file_obj->logical_path() ); + + named_shared_memory_object shm_obj{shmem_key, + DEFAULT_SHARED_MEMORY_TIMEOUT_IN_SECONDS, + SHMEM_SIZE}; + + shm_obj.atomic_exec([&number_of_threads, thread_id](auto& data) { + logger::debug("{}:{} ({}) [[{}]] number_of_threads in shared memory - {}", + __FILE__, __LINE__, __FUNCTION__, thread_id, data.number_of_threads); + number_of_threads = data.number_of_threads; + }); + + // get data_size { std::lock_guard lock(global_mutex); data_size = irods_s3::data_size; - number_of_threads = irods_s3::number_of_threads; } if (number_of_threads == 0) { number_of_threads = 1; @@ -1009,6 +1075,10 @@ namespace irods_s3 { if (is_cacheless_mode(_ctx.prop_map())) { + using named_shared_memory_object = + irods::experimental::interprocess::shared_memory::named_shared_memory_object + ; + std::uint64_t thread_id = std::hash{}(std::this_thread::get_id()); irods::file_object_ptr file_obj = boost::dynamic_pointer_cast(_ctx.fco()); @@ -1055,6 +1125,22 @@ namespace irods_s3 { irods::error result = s3_transport_ptr->get_error(); + // Decrement the threads_remaining_to_close counter in shared memory. + // Not necessary for GET_OPR as the shared memory is not created in that instance. + if (irods_s3::oprType != GET_OPR) { + + std::string shmem_key = get_shmem_key(_ctx, file_obj); + named_shared_memory_object shm_obj{shmem_key, + DEFAULT_SHARED_MEMORY_TIMEOUT_IN_SECONDS, + SHMEM_SIZE}; + + auto [open_count, ref_count] = shm_obj.atomic_exec([](auto& data) { + // shmem freed when threads_remaining_to_close is zero + return std::make_pair(--(data.threads_remaining_to_close), data.ref_count); + }); + logger::trace("{}:{} ({}) [[{}]] shmem_key={} hashed_string={} open_count={} ref_coun={}", __FILE__, __LINE__, __func__, thread_id, shmem_key, get_resource_name(_ctx.prop_map()) + file_obj->logical_path(), open_count, ref_count); + } + // because s3 might not provide immediate consistency for subsequent stats, // do a stat with a retry if not found if (s3_transport_ptr->is_last_file_to_close() && result.ok()) { @@ -1069,7 +1155,6 @@ namespace irods_s3 { { std::lock_guard lock(global_mutex); irods_s3::data_size = s3_transport_config::UNKNOWN_OBJECT_SIZE; - irods_s3::number_of_threads = 0; irods_s3::oprType = -1; } @@ -2166,8 +2251,11 @@ namespace irods_s3 { irods::hierarchy_parser* _out_parser, float* _out_vote ) { - logger::debug("{}:{} ({}) [[{}]] _opr={} _curr_host={}", __FILE__, __LINE__, __FUNCTION__, std::hash{}(std::this_thread::get_id()), - _opr == nullptr ? "nullptr" : _opr->c_str(), _curr_host->c_str()); + using named_shared_memory_object = irods::experimental::interprocess::shared_memory::named_shared_memory_object + ; + + logger::debug("{}:{} ({}) [[{}]] _opr={} _curr_host={} shmem_size={}", __FILE__, __LINE__, __FUNCTION__, std::hash{}(std::this_thread::get_id()), + _opr == nullptr ? "nullptr" : _opr->c_str(), _curr_host->c_str(), SHMEM_SIZE); for (int i = 0; i < NUM_FILE_DESC; ++i) { if (FileDesc[i].inuseFlag) { @@ -2216,8 +2304,17 @@ namespace irods_s3 { int number_of_threads = boost::lexical_cast(num_threads_str); // save the number of threads - std::lock_guard lock(global_mutex); - irods_s3::number_of_threads = number_of_threads; + std::string shmem_key = get_shmem_key(_ctx, file_obj); + logger::trace("{}:{} ({}) [[{}]] shmem_key={} hashed_string={}", __FILE__, __LINE__, __FUNCTION__, thread_id, shmem_key, get_resource_name(_ctx.prop_map()) + file_obj->logical_path() ); + + named_shared_memory_object shm_obj{shmem_key, + DEFAULT_SHARED_MEMORY_TIMEOUT_IN_SECONDS, + SHMEM_SIZE}; + + shm_obj.atomic_exec([number_of_threads](auto& data) { + data.number_of_threads = number_of_threads; + data.threads_remaining_to_close = number_of_threads; + }); } catch (const boost::bad_lexical_cast &) { number_of_threads = 0; diff --git a/s3_transport/include/irods/private/s3_transport/managed_shared_memory_object.hpp b/s3_transport/include/irods/private/s3_transport/managed_shared_memory_object.hpp index 63726f1..cbd8e8f 100644 --- a/s3_transport/include/irods/private/s3_transport/managed_shared_memory_object.hpp +++ b/s3_transport/include/irods/private/s3_transport/managed_shared_memory_object.hpp @@ -40,7 +40,7 @@ namespace irods::experimental::interprocess , last_access_time_in_seconds(access_time) {} - // T must have reset_fields() and ref_count and can_delete() + // T must have ref_count and can_delete() T thing; time_t last_access_time_in_seconds; @@ -83,7 +83,7 @@ namespace irods::experimental::interprocess if (shmem_has_expired) { - logger::debug("{}:{} ({}) SHMEM_HAS_EXPIRED", __FILE__, __LINE__, __FUNCTION__); + logger::debug("{}:{} ({}) SHMEM_HAS_EXPIRED", __FILE__, __LINE__, __func__); // rebuild shmem object shm_.destroy(SHARED_DATA_NAME.c_str()); @@ -110,9 +110,12 @@ namespace irods::experimental::interprocess object_->thing.~T(); object_ = nullptr; - bi::shared_memory_object::remove(shm_name_.c_str()); - bi::named_mutex::remove(shm_name_.c_str()); - + if (!bi::shared_memory_object::remove(shm_name_.c_str())) { + logger::error("{}:{} ({}) removal of shared memory object [{}] failed", __FILE__, __LINE__, __func__, shm_name_); + } + if (!bi::named_mutex::remove(shm_name_.c_str())) { + logger::error("{}:{} ({}) removal of mutex for shared memory object [{}] failed", __FILE__, __LINE__, __func__, shm_name_); + } } } } diff --git a/s3_transport/include/irods/private/s3_transport/multipart_shared_data.hpp b/s3_transport/include/irods/private/s3_transport/multipart_shared_data.hpp index d0189ae..2d1d3d5 100644 --- a/s3_transport/include/irods/private/s3_transport/multipart_shared_data.hpp +++ b/s3_transport/include/irods/private/s3_transport/multipart_shared_data.hpp @@ -1,5 +1,5 @@ -#ifndef S3_TRANSPORT_MULTIPART_SHARED_DATA_HPP -#define S3_TRANSPORT_MULTIPART_SHARED_DATA_HPP +#ifndef IRODS_S3_TRANSPORT_MULTIPART_SHARED_DATA_HPP +#define IRODS_S3_TRANSPORT_MULTIPART_SHARED_DATA_HPP #include #include @@ -55,21 +55,6 @@ namespace irods::experimental::io::s3_transport::shared_data , know_number_of_threads{true} {} - void reset_fields() - { - threads_remaining_to_close = 0; - done_initiate_multipart = false; - upload_id = ""; - etags.clear(); - last_error_code = error_codes::SUCCESS; - cache_file_download_progress = cache_file_download_status::NOT_STARTED; - ref_count = 1; // current object has reference so ref_count = 1 - circular_buffer_read_timeout = false; - file_open_counter = 0; - cache_file_flushed = false; - know_number_of_threads = true; - } - bool can_delete() { return know_number_of_threads ? threads_remaining_to_close == 0 @@ -94,4 +79,4 @@ namespace irods::experimental::io::s3_transport::shared_data -#endif // S3_TRANSPORT_MULTIPART_SHARED_DATA_HPP +#endif // IRODS_S3_TRANSPORT_MULTIPART_SHARED_DATA_HPP diff --git a/s3_transport/include/irods/private/s3_transport/util.hpp b/s3_transport/include/irods/private/s3_transport/util.hpp index 5a113b5..2c20574 100644 --- a/s3_transport/include/irods/private/s3_transport/util.hpp +++ b/s3_transport/include/irods/private/s3_transport/util.hpp @@ -52,12 +52,18 @@ namespace irods::experimental::io::s3_transport static const std::int64_t BYTES_PER_ETAG{112}; // 80 bytes for every string added, 32 bytes for the vector size, // determined by testing static const std::int64_t UPLOAD_ID_SIZE{128}; - static const std::int64_t MAX_S3_SHMEM_SIZE{sizeof(shared_data::multipart_shared_data) + + + // See https://groups.google.com/g/boost-list/c/5ADnEPYg-ho for an explanation + // of why the 100*sizeof(void*) is used below. Essentially, the shared memory + // must have enough space for the memory algorithm and reserved area but there is + // no way of knowing the size for these. It is stated that 100*sizeof(void*) would + // be enough. + static constexpr std::int64_t MAX_S3_SHMEM_SIZE{100*sizeof(void*) + sizeof(shared_data::multipart_shared_data) + MAXIMUM_NUMBER_ETAGS_PER_UPLOAD * (BYTES_PER_ETAG + 1) + UPLOAD_ID_SIZE + 1}; static const int DEFAULT_SHARED_MEMORY_TIMEOUT_IN_SECONDS{900}; - inline static const std::string SHARED_MEMORY_KEY_PREFIX{"irods_s3-shm-"}; + inline static const std::string SHARED_MEMORY_KEY_PREFIX{"irods_s3_transport-shm-"}; }; void print_bucket_context( const libs3_types::bucket_context& bucket_context );