Skip to content

Commit

Permalink
perf: Put canister queues behind Arcs (#3305)
Browse files Browse the repository at this point in the history
This makes unmutated canister queues basically free to clone, which is
important during certification and, more importantly, before and after
every message execution (when the `SystemState` is cloned in order to
allow for rollbacks). After the changes in #3241 and #3304, under heavy
subnet load (millions of best-effort messages) this accounts for about
half the message execution time; dropping to well under 10% with this
change.
  • Loading branch information
alin-at-dfinity authored Dec 30, 2024
1 parent 5716305 commit ab29295
Showing 1 changed file with 38 additions and 27 deletions.
65 changes: 38 additions & 27 deletions rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub struct CanisterQueues {
/// no corresponding message in the message pool; or entry in the compact
/// response maps (which record the `CallbackIds` of expired / shed inbound
/// best-effort responses).
canister_queues: BTreeMap<CanisterId, (InputQueue, OutputQueue)>,
canister_queues: BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,

/// Backing store for `canister_queues` references, combining a `MessagePool`
/// and maps of compact responses (`CallbackIds` of expired / shed responses),
Expand Down Expand Up @@ -180,7 +180,7 @@ pub struct CanisterQueues {
pub struct CanisterOutputQueuesIterator<'a> {
/// Priority queue of non-empty output queues. The next message to be popped
/// / peeked is the one at the front of the first queue.
queues: VecDeque<(&'a CanisterId, &'a mut OutputQueue)>,
queues: VecDeque<(&'a CanisterId, &'a mut Arc<OutputQueue>)>,

/// Mutable store holding the messages referenced by `queues`.
store: &'a mut MessageStoreImpl,
Expand All @@ -194,7 +194,7 @@ impl<'a> CanisterOutputQueuesIterator<'a> {
/// `CanisterQueues::canister_queues` (a map of `CanisterId` to an input queue,
/// output queue pair) and `MessagePool`.
fn new(
queues: &'a mut BTreeMap<CanisterId, (InputQueue, OutputQueue)>,
queues: &'a mut BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
store: &'a mut MessageStoreImpl,
) -> Self {
let queues: VecDeque<_> = queues
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<'a> CanisterOutputQueuesIterator<'a> {
/// Computes the number of (potentially stale) messages left in `queues`.
///
/// Time complexity: `O(n)`.
fn compute_size(queues: &VecDeque<(&'a CanisterId, &'a mut OutputQueue)>) -> usize {
fn compute_size(queues: &VecDeque<(&'a CanisterId, &'a mut Arc<OutputQueue>)>) -> usize {
queues.iter().map(|(_, q)| q.len()).sum()
}
}
Expand Down Expand Up @@ -383,10 +383,15 @@ impl MessageStoreImpl {
/// next non-stale reference.
///
/// Panics if the reference at the front of the queue is stale.
fn queue_pop_and_advance<T: Clone>(&mut self, queue: &mut CanisterQueue<T>) -> Option<T>
fn queue_pop_and_advance<T: Clone>(&mut self, queue: &mut Arc<CanisterQueue<T>>) -> Option<T>
where
MessageStoreImpl: MessageStore<T>,
{
if queue.len() == 0 {
return None;
}

let queue = Arc::make_mut(queue);
let reference = queue.pop()?;

// Advance to the next non-stale reference.
Expand Down Expand Up @@ -548,7 +553,7 @@ trait InboundMessageStore: MessageStore<CanisterInput> {
/// Time complexity: `O(n * log(n))`.
fn callbacks_with_enqueued_response(
&self,
canister_queues: &BTreeMap<CanisterId, (InputQueue, OutputQueue)>,
canister_queues: &BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
) -> Result<BTreeSet<CallbackId>, String>;
}

Expand All @@ -561,7 +566,7 @@ impl InboundMessageStore for MessageStoreImpl {

fn callbacks_with_enqueued_response(
&self,
canister_queues: &BTreeMap<CanisterId, (InputQueue, OutputQueue)>,
canister_queues: &BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
) -> Result<BTreeSet<CallbackId>, String> {
let mut callbacks = BTreeSet::new();
canister_queues
Expand Down Expand Up @@ -654,7 +659,11 @@ impl CanisterQueues {
F: FnMut(&CanisterId, &RequestOrResponse) -> Result<(), ()>,
{
for (canister_id, (_, queue)) in self.canister_queues.iter_mut() {
while let Some(reference) = queue.peek() {
loop {
let Some(reference) = queue.peek() else {
break;
};
let queue = Arc::make_mut(queue);
let Some(msg) = self.store.pool.get(reference) else {
// Expired / dropped message. Pop it and advance.
assert_eq!(Some(reference), queue.pop());
Expand Down Expand Up @@ -740,10 +749,10 @@ impl CanisterQueues {
}
// Safe to already (attempt to) reserve an output slot here, as the `push()`
// below is guaranteed to succeed due to the check above.
if let Err(e) = output_queue.try_reserve_response_slot() {
if let Err(e) = Arc::make_mut(output_queue).try_reserve_response_slot() {
return Err((e, msg));
}
input_queue
Arc::make_mut(input_queue)
}
RequestOrResponse::Response(ref response) => {
match self.canister_queues.get_mut(&sender) {
Expand All @@ -768,7 +777,7 @@ impl CanisterQueues {
return Ok(false);
}
}
queue
Arc::make_mut(queue)
}

// Queue does not exist or has no reserved slot for this response.
Expand Down Expand Up @@ -860,7 +869,7 @@ impl CanisterQueues {
}

let reference = self.store.push_inbound_timeout_response(callback_id);
input_queue.push_response(reference);
Arc::make_mut(input_queue).push_response(reference);
self.queue_stats.on_push_timeout_response();

// Add sender canister ID to the appropriate input schedule queue if it is not
Expand Down Expand Up @@ -1076,15 +1085,15 @@ impl CanisterQueues {
if let Err(e) = output_queue.check_has_request_slot() {
return Err((e, request));
}
if let Err(e) = input_queue.try_reserve_response_slot() {
if let Err(e) = Arc::make_mut(input_queue).try_reserve_response_slot() {
return Err((e, request));
}

self.queue_stats
.on_push_request(&request, Context::Outbound);

let reference = self.store.pool.insert_outbound_request(request, time);
output_queue.push_request(reference);
Arc::make_mut(output_queue).push_request(reference);

debug_assert_eq!(Ok(()), self.test_invariants());
Ok(())
Expand Down Expand Up @@ -1113,7 +1122,7 @@ impl CanisterQueues {

let (input_queue, _output_queue) =
get_or_insert_queues(&mut self.canister_queues, &request.receiver);
input_queue.try_reserve_response_slot()?;
Arc::make_mut(input_queue).try_reserve_response_slot()?;
self.queue_stats
.on_push_request(&request, Context::Outbound);
debug_assert_eq!(Ok(()), self.test_invariants());
Expand Down Expand Up @@ -1172,7 +1181,7 @@ impl CanisterQueues {
.expect("pushing response into inexistent output queue")
.1;
let reference = self.store.pool.insert_outbound_response(response);
output_queue.push_response(reference);
Arc::make_mut(output_queue).push_response(reference);

debug_assert_eq!(Ok(()), self.test_invariants());
}
Expand Down Expand Up @@ -1491,12 +1500,13 @@ impl CanisterQueues {
.expect("No matching queue for dropped message.");

if input_queue.peek() == Some(reference) {
let input_queue = Arc::make_mut(input_queue);
input_queue.pop();
self.store.queue_advance(input_queue);
}

// Release the outbound response slot.
output_queue.release_reserved_response_slot();
Arc::make_mut(output_queue).release_reserved_response_slot();
self.queue_stats.on_drop_input_request(&request);
}
}
Expand Down Expand Up @@ -1530,6 +1540,7 @@ impl CanisterQueues {
// a queue containing references `[1, 2]`; `1` and `2` expire as part of the
// same `time_out_messages()` call; `on_message_dropped(1)` will also pop `2`).
if output_queue.peek() == Some(reference) {
let output_queue = Arc::make_mut(output_queue);
output_queue.pop();
self.store.queue_advance(output_queue);
}
Expand All @@ -1550,7 +1561,7 @@ impl CanisterQueues {
.callbacks_with_enqueued_response
.insert(response.originator_reply_callback));
let reference = self.store.insert_inbound(response.into());
input_queue.push_response(reference);
Arc::make_mut(input_queue).push_response(reference);

// If the input queue is not already in a sender schedule, add it.
if input_queue.len() == 1 {
Expand Down Expand Up @@ -1602,7 +1613,7 @@ impl CanisterQueues {
self.input_schedule.test_invariants(
self.canister_queues
.iter()
.map(|(canister_id, (input_queue, _))| (canister_id, input_queue)),
.map(|(canister_id, (input_queue, _))| (canister_id, &**input_queue)),
&input_queue_type_fn,
)
}
Expand Down Expand Up @@ -1659,7 +1670,7 @@ impl CanisterQueues {
///
/// Time complexity: `O(canister_queues.len())`.
fn calculate_queue_stats(
canister_queues: &BTreeMap<CanisterId, (InputQueue, OutputQueue)>,
canister_queues: &BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
guaranteed_response_memory_reservations: usize,
transient_stream_guaranteed_responses_size_bytes: usize,
) -> QueueStats {
Expand All @@ -1684,12 +1695,12 @@ impl CanisterQueues {
/// Written as a free function in order to avoid borrowing the full
/// `CanisterQueues`, which then requires looking up the queues again.
fn get_or_insert_queues<'a>(
canister_queues: &'a mut BTreeMap<CanisterId, (InputQueue, OutputQueue)>,
canister_queues: &'a mut BTreeMap<CanisterId, (Arc<InputQueue>, Arc<OutputQueue>)>,
canister_id: &CanisterId,
) -> (&'a mut InputQueue, &'a mut OutputQueue) {
) -> (&'a mut Arc<InputQueue>, &'a mut Arc<OutputQueue>) {
let (input_queue, output_queue) = canister_queues.entry(*canister_id).or_insert_with(|| {
let input_queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY);
let output_queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY);
let input_queue = Arc::new(CanisterQueue::new(DEFAULT_QUEUE_CAPACITY));
let output_queue = Arc::new(CanisterQueue::new(DEFAULT_QUEUE_CAPACITY));
(input_queue, output_queue)
});
(input_queue, output_queue)
Expand Down Expand Up @@ -1749,8 +1760,8 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues {
.iter()
.map(|(canid, (iq, oq))| CanisterQueuePair {
canister_id: Some(pb_types::CanisterId::from(*canid)),
input_queue: Some(iq.into()),
output_queue: Some(oq.into()),
input_queue: Some((&**iq).into()),
output_queue: Some((&**oq).into()),
})
.collect(),
pool: if item.store.pool != MessagePool::default() {
Expand Down Expand Up @@ -1826,7 +1837,7 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can
}
});

Ok((canister_id, (iq, oq)))
Ok((canister_id, (Arc::new(iq), Arc::new(oq))))
})
.collect::<Result<_, Self::Error>>()?;

Expand Down

0 comments on commit ab29295

Please sign in to comment.