diff --git a/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp b/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp index 798401ce46..eeca49ed5c 100644 --- a/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp +++ b/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp @@ -20,5 +20,13 @@ namespace BloombergLP { namespace bmqc { +// ------------------------------------------- +// struct OrderedHashMapWithHistory_ImpDetails +// ------------------------------------------- + +const int + OrderedHashMapWithHistory_ImpDetails::k_INSERT_GC_MESSAGES_BATCH_SIZE = + 1000; + } // close package namespace } // close enterprise namespace diff --git a/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h b/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h index 36daf372c2..58557f50b8 100644 --- a/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h +++ b/src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h @@ -53,6 +53,19 @@ namespace BloombergLP { namespace bmqc { +// =========================================== +// struct OrderedHashMapWithHistory_ImpDetails +// =========================================== + +/// PRIVATE CLASS. +// For use only by `bmqc::OrderedHashMapWithHistory` implementation. +struct OrderedHashMapWithHistory_ImpDetails { + // PRIVATE CLASS DATA + /// How many messages to GC when GC required in + /// `bmqc::OrderedHashMapWithHistory::insert` + static const int k_INSERT_GC_MESSAGES_BATCH_SIZE; +}; + // ======================================== // class OrderedHashMapWithHistory_Iterator // ======================================== @@ -217,7 +230,27 @@ class OrderedHashMapWithHistory { size_t d_historySize; // how many historical (!d_isLive) items - gc_iterator d_gcIt; // where to start 'gc' + /// Whether this container has more elements to `gc`. This flag might be + /// set or unset during every `gc` call according to this container's + /// needs. + bool d_requireGC; + + /// The `now` time of the last GC. We assume that the current actual time + /// is no less than this timestamp. + TimeType d_lastGCTime; + + /// The iterator pointing to the element where garbage collection should + /// continue once `gc` is called. According to contract, this iterator + /// only goes forward. All the elements passed by this iterator are either + /// removed or marked for removal, depending on what happened first: + /// - If the element was not erased by the user before, but its timeout + /// happened in this container, it is marked for deletion in `gc` and + /// iterator goes forward. Next, it is the user's responsibility to call + /// `erase` on this element to fully remove it. + /// - If the user removes the element before its timeout happened, the + /// element becomes `not alive`, but still lives in the history. + /// Eventually `gc` reaches this element and fully removes it. + gc_iterator d_gcIt; // PRIVATE CLASS METHODS static const KEY& get_key(const bsl::pair& value) @@ -486,6 +519,8 @@ inline OrderedHashMapWithHistory:: , d_first(d_impl.end()) , d_last(d_impl.end()) , d_historySize(0) +, d_requireGC(false) +, d_lastGCTime(0) , d_gcIt(endGc()) { // NOTHING @@ -515,6 +550,8 @@ inline void OrderedHashMapWithHistory::clear() d_first = d_last = end(); d_gcIt = endGc(); + d_requireGC = false; + d_lastGCTime = 0; d_historySize = 0; } @@ -617,6 +654,12 @@ OrderedHashMapWithHistory::insert( const SOURCE_TYPE& value, TimeType timePoint) { + if (d_requireGC) { + gc(bsl::max(timePoint, d_lastGCTime), + OrderedHashMapWithHistory_ImpDetails:: + k_INSERT_GC_MESSAGES_BATCH_SIZE); + } + bsl::pair result = d_impl.insert( Value(value, d_timeout ? timePoint + d_timeout : 0)); // No need to keep track of element's timePoint if the map is not @@ -652,7 +695,9 @@ OrderedHashMapWithHistory::gc(TimeType now, // 'erase' can set the iterator back to erase item if its expiration time // is sooner than the current one. - if (d_gcIt == endGc()) { + d_lastGCTime = now; + if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_gcIt == endGc())) { + BSLS_PERFORMANCEHINT_UNLIKELY_HINT; d_gcIt = beginGc(); } @@ -664,12 +709,15 @@ OrderedHashMapWithHistory::gc(TimeType now, } gc_iterator it = d_gcIt++; if (it->d_isLive) { - // This is an old item. It should be removed by 'erase'. - // No need to return to this item. No need to check time again. - // Indicate that it needs to be removed by setting its time to 0. + // This item was not erased by the user yet, but its timeout in + // this container happened. Mark it for deletion by setting + // `d_time` to 0, so the next time user calls `erase` on it, it + // will be fully removed. it->d_time = 0; } else { + // This item was erased by the user before, and we can fully remove + // it right here. d_impl.erase(it); --d_historySize; } @@ -677,10 +725,12 @@ OrderedHashMapWithHistory::gc(TimeType now, if (--batchSize == 0) { // remember where we have stopped and resume from there next time + d_requireGC = true; return true; // RETURN } } + d_requireGC = false; return false; } diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index e319f48f38..1cb1950413 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -72,7 +72,7 @@ namespace BloombergLP { namespace mqbblp { namespace { -const int k_GC_MESSAGES_INTERVAL_SECONDS = 30; +const int k_GC_MESSAGES_INTERVAL_SECONDS = 5; bsl::ostream& printRecoveryBanner(bsl::ostream& out, const bsl::string& lastLineSuffix) diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d472aa809c..2c2a912ffc 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -49,7 +49,7 @@ namespace BloombergLP { namespace mqbc { namespace { -const int k_GC_MESSAGES_INTERVAL_SECONDS = 30; +const int k_GC_MESSAGES_INTERVAL_SECONDS = 5; bool isPrimaryActive(const mqbi::StorageManager_PartitionInfo pinfo) { diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 0b59ae95a7..06594c71d7 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -7212,22 +7212,18 @@ void FileStore::flush() return; // RETURN } - const bool haveMore = gcExpiredMessages(bdlt::CurrentTime::utc()); - const bool haveMoreHistory = gcHistory(); + const bool BSLA_MAYBE_UNUSED haveMore = gcExpiredMessages( + bdlt::CurrentTime::utc()); + const bool BSLA_MAYBE_UNUSED haveMoreHistory = gcHistory(); // This is either Idle or k_GC_MESSAGES_INTERVAL_SECONDS timeout. - // 'gcHistory' attempts to iterate all old items. If there are more of them - // than the batchSize (1000), it returns 'true'. In this case, re-enable - // flush client to call it again next Idle time. - // If it returns 'false', there is no immediate work. Wait for the - // next k_GC_MESSAGES_INTERVAL_SECONDS. - - if (haveMore || haveMoreHistory) { - // Explicitly schedule 'flush()' instead of relying on idleness - dispatcher()->execute(bdlf::BindUtil::bind(&FileStore::flush, this), - this, - mqbi::DispatcherEventType::e_CALLBACK); - } + // We try to remove at most k_GC_MESSAGES_BATCH_SIZE items in history. + // If there are more items ready to remove, the container's state changes, + // so any additional `insert` operation to the container will cause + // additional GC, until all old items are removed. + // If we don't balance adding new elements to the history with GC history, + // we might lose a lot of time on allocations of new items to the history, + // as well as get OOM due to uncontrollable history size increase. } void FileStore::setReplicationFactor(int value)