diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp b/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp index 37265b7ba..e7f79048d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp @@ -130,9 +130,8 @@ void PushStreamIterator::removeCurrentElement() d_currentElement = d_currentElement->next(); ++d_currentOrdinal; - d_owner_p->remove(del); - d_owner_p->destroy(del, true); - // doKeepGuid because of the d_iterator + d_owner_p->remove(del, false); + // cannot erase the GUID because of the d_iterator if (d_iterator->second.numElements() == 0) { BSLS_ASSERT_SAFE(d_currentElement == 0); @@ -291,9 +290,8 @@ bool VirtualPushStreamIterator::advance() d_currentElement = d_currentElement->nextInApp(); - d_owner_p->remove(del); - d_owner_p->destroy(del, false); - // do not keep Guid + d_owner_p->remove(del, true); + // can erase GUID if (atEnd()) { return false; diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.h b/src/groups/mqb/mqbblp/mqbblp_pushstream.h index 9fc594b1e..4a39a99f0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.h +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.h @@ -194,9 +194,11 @@ struct PushStream { void add(Element* element); /// Remove the specified `element` from both GUID and App corresponding to - /// the `element` (and specified when constructing the `element`). - /// Return the number of remaining Elements in the corresponding GUID. - unsigned int remove(Element* element); + /// the `element` (and specified when constructing the `element`). If + /// there are no more elements in the App, erase the App. If the specified + /// `canEraseGuid` is `true` and there are no more elements in the GUID, + /// erase the GUID. + void remove(Element* element, bool canEraseGuid); /// Remove all PushStream Elements corresponding to the specified /// `upstreamSubQueueId`. Erase each corresponding GUIDs from the @@ -218,9 +220,6 @@ struct PushStream { Element* create(const bmqp::SubQueueInfo& info, const iterator& iterator, const Apps::iterator& iteratorApp); - - /// Destroy the specified `element` - void destroy(Element* element, bool doKeepGuid); }; // ======================== @@ -605,6 +604,7 @@ inline void PushStream::App::add(Element* element) { d_elements.add(element, e_APP); } + inline void PushStream::App::remove(Element* element) { d_elements.remove(element, e_APP); @@ -631,19 +631,6 @@ PushStream::create(const bmqp::SubQueueInfo& subscription, return element; } -inline void PushStream::destroy(Element* element, bool doKeepGuid) -{ - if (element->app().d_elements.numElements() == 0) { - element->eraseApp(d_apps); - } - - if (!doKeepGuid && element->guid().numElements() == 0) { - element->eraseGuid(d_stream); - } - - d_pushElementsPool_sp->deallocate(element); -} - inline PushStream::iterator PushStream::findOrAppendMessage(const bmqt::MessageGUID& guid) { @@ -662,7 +649,7 @@ inline void PushStream::add(Element* element) element->app().add(element); } -inline unsigned int PushStream::remove(Element* element) +inline void PushStream::remove(Element* element, bool canEraseGuid) { BSLS_ASSERT_SAFE(element); BSLS_ASSERT_SAFE(!element->equal(d_stream.end())); @@ -673,7 +660,15 @@ inline unsigned int PushStream::remove(Element* element) // remove from the guid element->guid().remove(element, e_GUID); - return element->guid().numElements(); + if (element->app().d_elements.numElements() == 0) { + element->eraseApp(d_apps); + } + + if (canEraseGuid && element->guid().numElements() == 0) { + element->eraseGuid(d_stream); + } + + d_pushElementsPool_sp->deallocate(element); } inline unsigned int PushStream::removeApp(unsigned int upstreamSubQueueId) @@ -695,10 +690,9 @@ inline unsigned int PushStream::removeApp(Apps::iterator itApp) for (unsigned int count = 0; count < numElements; ++count) { Element* element = itApp->second.d_elements.front(); - remove(element); - - destroy(element, false); - // do not keep Guid + remove(element, true); + // do not keep Guid. This relies on either 'beforeOneAppRemoved' or + // resetting iterator(s). } return numElements; diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp b/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp index f6506be07..df28b83b9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp @@ -58,8 +58,7 @@ static void test1_basic() itApp); ps.add(element); - ps.remove(element); - ps.destroy(element, false); + ps.remove(element, true); } static void test2_iterations() @@ -183,10 +182,8 @@ static void test2_iterations() ASSERT(vit.atEnd()); } - ps.remove(element2); - ps.destroy(element2, false); - ps.remove(element3); - ps.destroy(element3, false); + ps.remove(element2, true); + ps.remove(element3, true); } // ============================================================================ diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index 92c7876ff..023b3169f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -633,7 +633,8 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext( mqbi::Queue* queue, bslma::Allocator* allocator) : d_consumers(allocator) -, d_isReady(false) +, d_numApps(0) +, d_numStops(0) , d_currentMessage(0) , d_queue_p(queue) , d_timeDelta() @@ -652,25 +653,26 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext( BSLS_ASSERT_SAFE(queue); } -void QueueEngineUtil_AppsDeliveryContext::start() -{ - d_isReady = true; -} - bool QueueEngineUtil_AppsDeliveryContext::reset( mqbi::StorageIterator* currentMessage) { d_consumers.clear(); d_timeDelta.reset(); - if (!d_isReady) { - return false; // RETURN + bool result = false; + + if (haveProgress() && currentMessage && currentMessage->hasReceipt()) { + d_currentMessage = currentMessage; + result = true; + } + else { + d_currentMessage = 0; } - d_currentMessage = currentMessage; - d_isReady = false; + d_numApps = 0; + d_numStops = 0; - return d_currentMessage ? d_currentMessage->hasReceipt() : false; + return result; } bool QueueEngineUtil_AppsDeliveryContext::processApp( @@ -679,12 +681,12 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp( { BSLS_ASSERT_SAFE(d_currentMessage->hasReceipt()); + ++d_numApps; + if (d_queue_p->isDeliverAll()) { // collect all handles app.routing()->iterateConsumers(d_broadcastVisitor, d_currentMessage); - d_isReady = true; - // Broadcast does not need stats nor any special per-message treatment. return false; // RETURN } @@ -695,6 +697,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp( // The queue iterator can advance leaving the 'app' behind. app.setResumePoint(d_currentMessage->guid()); } + ++d_numStops; // else the existing resumePoint is earlier (if authorized) return false; // RETURN } @@ -703,7 +706,6 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp( ordinal); if (!appView.isNew()) { - d_isReady = true; return true; // RETURN } @@ -736,7 +738,9 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp( // Early return. // If all Apps return 'e_NO_CAPACITY_ALL', stop the iteration - // (d_isReady == false). + // (d_numApps == 0). + + ++d_numStops; return false; // RETURN } @@ -750,7 +754,6 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp( } // Still making progress (result != Routers::e_NO_CAPACITY_ALL) - d_isReady = true; return (result == Routers::e_SUCCESS); } @@ -811,7 +814,7 @@ void QueueEngineUtil_AppsDeliveryContext::deliverMessage() } } - if (d_isReady) { + if (haveProgress()) { d_currentMessage->advance(); } @@ -823,6 +826,11 @@ bool QueueEngineUtil_AppsDeliveryContext::isEmpty() const return d_consumers.empty(); } +bool QueueEngineUtil_AppsDeliveryContext::haveProgress() const +{ + return (d_numStops < d_numApps || d_numApps == 0); +} + bsls::Types::Int64 QueueEngineUtil_AppsDeliveryContext::timeDelta() { if (!d_timeDelta.has_value()) { diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h index c6b0f95c7..9f9dfe2d3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h @@ -599,7 +599,8 @@ struct QueueEngineUtil_AppsDeliveryContext { private: Consumers d_consumers; - bool d_isReady; + int d_numApps; + int d_numStops; // Apps not moving mqbi::StorageIterator* d_currentMessage; mqbi::Queue* d_queue_p; bsl::optional d_timeDelta; @@ -626,9 +627,6 @@ struct QueueEngineUtil_AppsDeliveryContext { QueueEngineUtil_AppsDeliveryContext(mqbi::Queue* queue, bslma::Allocator* allocator); - /// Start delivery cycle(s). - void start(); - /// Prepare the context to process next message. /// Return `true` if the delivery can continue iterating dataStream /// The `false` return value indicates either the end of the dataStream or @@ -662,6 +660,9 @@ struct QueueEngineUtil_AppsDeliveryContext { /// Return `true` if there is at least one delivery target selected. bool isEmpty() const; + /// Return `true` if not all Apps are at capacity or there are no Apps. + bool haveProgress() const; + bsls::Types::Int64 timeDelta(); }; diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 559af8142..48acd1aba 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -580,8 +580,6 @@ void RelayQueueEngine::deliverMessages() // 1. End of storage; or // 2. All subStreams return 'e_NO_CAPACITY_ALL' - d_appsDeliveryContext.start(); - while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) { // Assume, all Apps need to deliver (some may be at capacity) unsigned int numApps = d_storageIter_mp->numApps(); @@ -604,14 +602,13 @@ void RelayQueueEngine::deliverMessages() d_storageIter_mp->removeCurrentElement(); } - - if (d_appsDeliveryContext.processApp(*app, i)) { + else if (d_appsDeliveryContext.processApp(*app, i)) { // The current element has made it either to delivery or - // putAside or resumerPoint and it can be removed + // putAside and it can be removed d_storageIter_mp->removeCurrentElement(); } - // Else, the current element has made it to resumerPoint and - // it cannot be removed + // Else, the current element has made it to resumePoint and it + // cannot be removed. } d_appsDeliveryContext.deliverMessage(); } @@ -1919,14 +1916,22 @@ void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes, void RelayQueueEngine::beforeOneAppRemoved(unsigned int upstreamSubQueueId) { while (!d_storageIter_mp->atEnd()) { - if (d_storageIter_mp->numApps() > 1) { + const int numApps = d_storageIter_mp->numApps(); + if (numApps > 1) { // Removal of App's elements will not invalidate 'd_storageIter_mp' break; } + if (numApps == 1) { + const PushStream::Element* element = d_storageIter_mp->element(0); + if (element->app().d_app->upstreamSubQueueId() != + upstreamSubQueueId) { + break; + } + } + else { + BSLS_ASSERT_SAFE(numApps == 0); - const PushStream::Element* element = d_storageIter_mp->element(0); - if (element->app().d_app->upstreamSubQueueId() != upstreamSubQueueId) { - break; + // The case when 'advance' does not follow 'removeCurrentElement' } d_storageIter_mp->advance(); diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 4e6425b41..ee4fafe7b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -1276,7 +1276,6 @@ void RootQueueEngine::afterNewMessage( d_queueState_p->queue())); // Deliver new messages to active (alive and capable to deliver) consumers - d_appsDeliveryContext.start(); while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) { // Assume, all Apps need to deliver (some may be at capacity) diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp index 8cd24b5bf..fabab605c 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp @@ -422,9 +422,10 @@ int VirtualStorageCatalog::addVirtualStorage(bsl::ostream& errorDescription, appOrdinal = d_nextOrdinal++; } else { - appOrdinal = d_availableOrdinals.front(); + AvailableOrdinals::const_iterator first = d_availableOrdinals.cbegin(); + appOrdinal = *first; // There is no conflict because everything 'appOrdinal' was removed. - d_availableOrdinals.pop_front(); + d_availableOrdinals.erase(first); } BSLS_ASSERT_SAFE(appOrdinal <= d_virtualStorages.size()); @@ -468,7 +469,7 @@ bool VirtualStorageCatalog::removeVirtualStorage( removeAll(appKey); const VirtualStorage& vs = *it->value(); - d_availableOrdinals.push_back(vs.ordinal()); + d_availableOrdinals.insert(vs.ordinal()); if (d_queue_p) { BSLS_ASSERT_SAFE(d_queue_p->queueEngine()); diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h index 94f89762f..ede15bf1f 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h @@ -86,7 +86,7 @@ class VirtualStorageCatalog { typedef bsl::shared_ptr VirtualStorageSp; /// List of available ordinal values for Virtual Storages. - typedef bsl::list AvailableOrdinals; + typedef bsl::set AvailableOrdinals; /// appKey -> virtualStorage typedef bmqc::