Skip to content

Commit

Permalink
RTPS Writer HistoryBuffer garbage collection logic was slow and wrong…
Browse files Browse the repository at this point in the history
…. GitHub issue #359.

Thanks to @nbaldy-hb at GitHub for pointing this out.
  • Loading branch information
jhelovuo committed Dec 11, 2024
1 parent 7d66833 commit ae49aa9
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions src/rtps/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
cmp::{max, min},
cmp::max,
collections::{BTreeMap, BTreeSet},
ops::Bound::Included,
rc::Rc,
Expand Down Expand Up @@ -365,7 +365,7 @@ impl Writer {
});

// TODO: Configuration value
let cache_cleaning_period = Duration::from_secs(2 * 60);
let cache_cleaning_period = Duration::from_secs(6);

// Start periodic Heartbeat
if let Some(period) = heartbeat_period {
Expand Down Expand Up @@ -514,13 +514,13 @@ impl Writer {
match self.qos_policies.history {
None => {
// DDS Specification says this is the default History policy
self.remove_all_acked_changes_but_keep_depth(1);
self.remove_all_acked_changes_but_keep_depth(Some(1), resource_limit);
}
Some(History::KeepAll) => {
self.remove_all_acked_changes_but_keep_depth(resource_limit);
self.remove_all_acked_changes_but_keep_depth(None, resource_limit);
}
Some(History::KeepLast { depth: d }) => {
self.remove_all_acked_changes_but_keep_depth(min(d as usize, resource_limit));
self.remove_all_acked_changes_but_keep_depth(Some(d as usize), resource_limit);
}
}
}
Expand Down Expand Up @@ -1364,7 +1364,11 @@ impl Writer {
/// (Reliable) Depth is QoS policy History depth.
/// Returns SequenceNumbers of removed CacheChanges
/// This is called repeatedly by handle_cache_cleaning action.
fn remove_all_acked_changes_but_keep_depth(&mut self, depth: usize) {
fn remove_all_acked_changes_but_keep_depth(
&mut self,
depth: Option<usize>,
resource_limit: usize,
) {
let first_keeper = if !self.like_stateless {
// Regular stateful writer behavior
// All readers have acked up to this point (SequenceNumber)
Expand All @@ -1376,16 +1380,35 @@ impl Writer {
.unwrap_or_else(SequenceNumber::zero);
// If all readers have acked all up to before 5, and depth is 5, we need
// to keep samples 0..4, i.e. from acked_up_to_before - depth .
max(
acked_by_all_readers - SequenceNumber::from(depth),
self.history_buffer.first_change_sequence_number(),
)
if let Some(depth) = depth {
max(
acked_by_all_readers - SequenceNumber::from(depth),
self.history_buffer.first_change_sequence_number(),
)
} else {
// try to keep all
self.history_buffer.first_change_sequence_number()
}
} else {
// Stateless-like writer currently supports only BestEffort behavior, so here we
// make it explicit that it does not care about acked sequence numbers
self.history_buffer.first_change_sequence_number()
let depth = depth.unwrap_or(0);
max(
self.history_buffer.last_change_sequence_number() - SequenceNumber::from(depth),
self.history_buffer.first_change_sequence_number(),
)
};

let first_keeper = max(
max(
first_keeper,
self.history_buffer.last_change_sequence_number() - SequenceNumber::from(resource_limit),
),
SequenceNumber::zero(),
);
warn!(
"HistoryBuffer: cleaning before {first_keeper:?} topic={:?}",
self.topic_name()
);
// actual cleaning
self.history_buffer.remove_changes_before(first_keeper);
}
Expand Down

0 comments on commit ae49aa9

Please sign in to comment.