diff --git a/rs/replicated_state/src/canister_state/queues.rs b/rs/replicated_state/src/canister_state/queues.rs index a29e1db5a70..5d0b8db172c 100644 --- a/rs/replicated_state/src/canister_state/queues.rs +++ b/rs/replicated_state/src/canister_state/queues.rs @@ -141,7 +141,7 @@ pub struct CanisterQueues { /// no corresponding message in the message pool; or entry in the compact /// response maps (which record the `CallbackIds` of expired / shed inbound /// best-effort responses). - canister_queues: BTreeMap, + canister_queues: BTreeMap, Arc)>, /// Backing store for `canister_queues` references, combining a `MessagePool` /// and maps of compact responses (`CallbackIds` of expired / shed responses), @@ -180,7 +180,7 @@ pub struct CanisterQueues { pub struct CanisterOutputQueuesIterator<'a> { /// Priority queue of non-empty output queues. The next message to be popped /// / peeked is the one at the front of the first queue. - queues: VecDeque<(&'a CanisterId, &'a mut OutputQueue)>, + queues: VecDeque<(&'a CanisterId, &'a mut Arc)>, /// Mutable store holding the messages referenced by `queues`. store: &'a mut MessageStoreImpl, @@ -194,7 +194,7 @@ impl<'a> CanisterOutputQueuesIterator<'a> { /// `CanisterQueues::canister_queues` (a map of `CanisterId` to an input queue, /// output queue pair) and `MessagePool`. fn new( - queues: &'a mut BTreeMap, + queues: &'a mut BTreeMap, Arc)>, store: &'a mut MessageStoreImpl, ) -> Self { let queues: VecDeque<_> = queues @@ -281,7 +281,7 @@ impl<'a> CanisterOutputQueuesIterator<'a> { /// Computes the number of (potentially stale) messages left in `queues`. /// /// Time complexity: `O(n)`. - fn compute_size(queues: &VecDeque<(&'a CanisterId, &'a mut OutputQueue)>) -> usize { + fn compute_size(queues: &VecDeque<(&'a CanisterId, &'a mut Arc)>) -> usize { queues.iter().map(|(_, q)| q.len()).sum() } } @@ -383,10 +383,15 @@ impl MessageStoreImpl { /// next non-stale reference. /// /// Panics if the reference at the front of the queue is stale. - fn queue_pop_and_advance(&mut self, queue: &mut CanisterQueue) -> Option + fn queue_pop_and_advance(&mut self, queue: &mut Arc>) -> Option where MessageStoreImpl: MessageStore, { + if queue.len() == 0 { + return None; + } + + let queue = Arc::make_mut(queue); let reference = queue.pop()?; // Advance to the next non-stale reference. @@ -548,7 +553,7 @@ trait InboundMessageStore: MessageStore { /// Time complexity: `O(n * log(n))`. fn callbacks_with_enqueued_response( &self, - canister_queues: &BTreeMap, + canister_queues: &BTreeMap, Arc)>, ) -> Result, String>; } @@ -561,7 +566,7 @@ impl InboundMessageStore for MessageStoreImpl { fn callbacks_with_enqueued_response( &self, - canister_queues: &BTreeMap, + canister_queues: &BTreeMap, Arc)>, ) -> Result, String> { let mut callbacks = BTreeSet::new(); canister_queues @@ -654,7 +659,11 @@ impl CanisterQueues { F: FnMut(&CanisterId, &RequestOrResponse) -> Result<(), ()>, { for (canister_id, (_, queue)) in self.canister_queues.iter_mut() { - while let Some(reference) = queue.peek() { + loop { + let Some(reference) = queue.peek() else { + break; + }; + let queue = Arc::make_mut(queue); let Some(msg) = self.store.pool.get(reference) else { // Expired / dropped message. Pop it and advance. assert_eq!(Some(reference), queue.pop()); @@ -740,10 +749,10 @@ impl CanisterQueues { } // Safe to already (attempt to) reserve an output slot here, as the `push()` // below is guaranteed to succeed due to the check above. - if let Err(e) = output_queue.try_reserve_response_slot() { + if let Err(e) = Arc::make_mut(output_queue).try_reserve_response_slot() { return Err((e, msg)); } - input_queue + Arc::make_mut(input_queue) } RequestOrResponse::Response(ref response) => { match self.canister_queues.get_mut(&sender) { @@ -768,7 +777,7 @@ impl CanisterQueues { return Ok(false); } } - queue + Arc::make_mut(queue) } // Queue does not exist or has no reserved slot for this response. @@ -860,7 +869,7 @@ impl CanisterQueues { } let reference = self.store.push_inbound_timeout_response(callback_id); - input_queue.push_response(reference); + Arc::make_mut(input_queue).push_response(reference); self.queue_stats.on_push_timeout_response(); // Add sender canister ID to the appropriate input schedule queue if it is not @@ -1076,7 +1085,7 @@ impl CanisterQueues { if let Err(e) = output_queue.check_has_request_slot() { return Err((e, request)); } - if let Err(e) = input_queue.try_reserve_response_slot() { + if let Err(e) = Arc::make_mut(input_queue).try_reserve_response_slot() { return Err((e, request)); } @@ -1084,7 +1093,7 @@ impl CanisterQueues { .on_push_request(&request, Context::Outbound); let reference = self.store.pool.insert_outbound_request(request, time); - output_queue.push_request(reference); + Arc::make_mut(output_queue).push_request(reference); debug_assert_eq!(Ok(()), self.test_invariants()); Ok(()) @@ -1113,7 +1122,7 @@ impl CanisterQueues { let (input_queue, _output_queue) = get_or_insert_queues(&mut self.canister_queues, &request.receiver); - input_queue.try_reserve_response_slot()?; + Arc::make_mut(input_queue).try_reserve_response_slot()?; self.queue_stats .on_push_request(&request, Context::Outbound); debug_assert_eq!(Ok(()), self.test_invariants()); @@ -1172,7 +1181,7 @@ impl CanisterQueues { .expect("pushing response into inexistent output queue") .1; let reference = self.store.pool.insert_outbound_response(response); - output_queue.push_response(reference); + Arc::make_mut(output_queue).push_response(reference); debug_assert_eq!(Ok(()), self.test_invariants()); } @@ -1491,12 +1500,13 @@ impl CanisterQueues { .expect("No matching queue for dropped message."); if input_queue.peek() == Some(reference) { + let input_queue = Arc::make_mut(input_queue); input_queue.pop(); self.store.queue_advance(input_queue); } // Release the outbound response slot. - output_queue.release_reserved_response_slot(); + Arc::make_mut(output_queue).release_reserved_response_slot(); self.queue_stats.on_drop_input_request(&request); } } @@ -1530,6 +1540,7 @@ impl CanisterQueues { // a queue containing references `[1, 2]`; `1` and `2` expire as part of the // same `time_out_messages()` call; `on_message_dropped(1)` will also pop `2`). if output_queue.peek() == Some(reference) { + let output_queue = Arc::make_mut(output_queue); output_queue.pop(); self.store.queue_advance(output_queue); } @@ -1550,7 +1561,7 @@ impl CanisterQueues { .callbacks_with_enqueued_response .insert(response.originator_reply_callback)); let reference = self.store.insert_inbound(response.into()); - input_queue.push_response(reference); + Arc::make_mut(input_queue).push_response(reference); // If the input queue is not already in a sender schedule, add it. if input_queue.len() == 1 { @@ -1602,7 +1613,7 @@ impl CanisterQueues { self.input_schedule.test_invariants( self.canister_queues .iter() - .map(|(canister_id, (input_queue, _))| (canister_id, input_queue)), + .map(|(canister_id, (input_queue, _))| (canister_id, &**input_queue)), &input_queue_type_fn, ) } @@ -1659,7 +1670,7 @@ impl CanisterQueues { /// /// Time complexity: `O(canister_queues.len())`. fn calculate_queue_stats( - canister_queues: &BTreeMap, + canister_queues: &BTreeMap, Arc)>, guaranteed_response_memory_reservations: usize, transient_stream_guaranteed_responses_size_bytes: usize, ) -> QueueStats { @@ -1684,12 +1695,12 @@ impl CanisterQueues { /// Written as a free function in order to avoid borrowing the full /// `CanisterQueues`, which then requires looking up the queues again. fn get_or_insert_queues<'a>( - canister_queues: &'a mut BTreeMap, + canister_queues: &'a mut BTreeMap, Arc)>, canister_id: &CanisterId, -) -> (&'a mut InputQueue, &'a mut OutputQueue) { +) -> (&'a mut Arc, &'a mut Arc) { let (input_queue, output_queue) = canister_queues.entry(*canister_id).or_insert_with(|| { - let input_queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); - let output_queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); + let input_queue = Arc::new(CanisterQueue::new(DEFAULT_QUEUE_CAPACITY)); + let output_queue = Arc::new(CanisterQueue::new(DEFAULT_QUEUE_CAPACITY)); (input_queue, output_queue) }); (input_queue, output_queue) @@ -1749,8 +1760,8 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues { .iter() .map(|(canid, (iq, oq))| CanisterQueuePair { canister_id: Some(pb_types::CanisterId::from(*canid)), - input_queue: Some(iq.into()), - output_queue: Some(oq.into()), + input_queue: Some((&**iq).into()), + output_queue: Some((&**oq).into()), }) .collect(), pool: if item.store.pool != MessagePool::default() { @@ -1826,7 +1837,7 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can } }); - Ok((canister_id, (iq, oq))) + Ok((canister_id, (Arc::new(iq), Arc::new(oq)))) }) .collect::>()?;