Skip to content

Commit

Permalink
Add Wait TTL to the config
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Nov 22, 2024
1 parent dfde2cc commit 1ca7057
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 47 deletions.
14 changes: 14 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,26 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_drop * 2 ^ ttl
ttl: 8,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_close * 2 ^ ttl
ttl: 2,
},
},
/// Perform batching of messages if they are smaller of the batch_size
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
ttl: 8,
}
}
}
Expand All @@ -261,6 +262,7 @@ impl Default for CongestionControlBlockConf {
fn default() -> Self {
Self {
wait_before_close: 5000000,
ttl: 2,
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,26 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message
/// if still no batch is available.
wait_before_drop: i64,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_drop * 2 ^ ttl
ttl: usize,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
pub block: CongestionControlBlockConf {
/// The maximum time in microseconds to wait for an available batch before closing the transport session
/// when sending a blocking message if still no batch is available.
wait_before_close: i64,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_close * 2 ^ ttl
ttl: usize,
},
},
pub batching: BatchingConf {
Expand Down
46 changes: 23 additions & 23 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,15 @@ impl WaitTime {
Self { wait_time, ttl }
}

fn wait_time(&mut self) -> Duration {
let result = self.wait_time;
match self.ttl.checked_sub(1) {
Some(new_ttl) => {
self.ttl = new_ttl;
self.wait_time = result * 2;
}
None => {
self.wait_time = Duration::ZERO;
}
fn advance(&mut self) {
if let Some(new_ttl) = self.ttl.checked_sub(1) {
self.ttl = new_ttl;
self.wait_time *= 2;
}
result
}

fn wait_time(&self) -> Duration {
self.wait_time
}
}

Expand Down Expand Up @@ -180,8 +177,11 @@ impl LazyDeadline {
DeadlineSetting::Finite(mut instant) => {
// SAFETY: this is safe because DeadlineSetting::Finite is returned by
// deadline() only if wait_time is Some(_)
instant =
instant.add(unsafe { self.wait_time.as_mut().unwrap_unchecked().wait_time() });
let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() };

instant = instant.add(wait_time.wait_time());
wait_time.advance();

self.deadline = Some(DeadlineSetting::Finite(instant));
}
}
Expand All @@ -207,10 +207,10 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Option<(Duration, usize)>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(
wait_time.map(|wait_time| WaitTime::new(wait_time, 10)),
wait_time.map(|(wait_time, ttl)| WaitTime::new(wait_time, ttl)),
),
}
}
Expand Down Expand Up @@ -607,8 +607,8 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) wait_before_close: Duration,
pub(crate) wait_before_drop: (Duration, usize),
pub(crate) wait_before_close: (Duration, usize),
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
}
Expand Down Expand Up @@ -710,8 +710,8 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
wait_before_close: Duration,
wait_before_drop: (Duration, usize),
wait_before_close: (Duration, usize),
}

impl TransmissionPipelineProducer {
Expand Down Expand Up @@ -886,8 +886,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
wait_before_drop: (Duration::from_millis(1), 10),
wait_before_close: (Duration::from_secs(5), 10),
batching_time_limit: Duration::from_micros(1),
};

Expand All @@ -900,8 +900,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
wait_before_drop: (Duration::from_millis(1), 10),
wait_before_close: (Duration::from_secs(5), 10),
batching_time_limit: Duration::from_micros(1),
};

Expand Down
48 changes: 24 additions & 24 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ pub struct TransportManagerConfig {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub wait_before_close: Duration,
pub wait_before_drop: (Duration, usize),
pub wait_before_close: (Duration, usize),
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -141,8 +141,8 @@ pub struct TransportManagerBuilder {
batch_size: BatchSize,
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: Duration,
wait_before_close: Duration,
wait_before_drop: (Duration, usize),
wait_before_close: (Duration, usize),
queue_size: QueueSizeConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
Expand Down Expand Up @@ -192,12 +192,12 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
pub fn wait_before_drop(mut self, wait_before_drop: (Duration, usize)) -> Self {
self.wait_before_drop = wait_before_drop;
self
}

pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self {
pub fn wait_before_close(mut self, wait_before_close: (Duration, usize)) -> Self {
self.wait_before_close = wait_before_close;
self
}
Expand Down Expand Up @@ -249,6 +249,8 @@ impl TransportManagerBuilder {
}

let link = config.transport().link();
let cc_drop = link.tx().queue().congestion_control().drop();
let cc_block = link.tx().queue().congestion_control().block();
let mut resolution = Resolution::default();
resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution());
self = self.resolution(resolution);
Expand All @@ -259,21 +261,13 @@ impl TransportManagerBuilder {
));
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.drop()
.wait_before_drop(),
self = self.wait_before_drop((
duration_from_i64us(*cc_drop.wait_before_drop()),
*cc_drop.ttl(),
));
self = self.wait_before_close(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.block()
.wait_before_close(),
self = self.wait_before_close((
duration_from_i64us(*cc_block.wait_before_close()),
*cc_block.ttl(),
));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
Expand Down Expand Up @@ -372,17 +366,23 @@ impl Default for TransportManagerBuilder {
let link_rx = LinkRxConf::default();
let queue = QueueConf::default();
let backoff = *queue.batching().time_limit();
let wait_before_drop = *queue.congestion_control().drop().wait_before_drop();
let wait_before_close = *queue.congestion_control().block().wait_before_close();
let cc_drop = queue.congestion_control().drop();
let cc_block = queue.congestion_control().block();
Self {
version: VERSION,
zid: ZenohIdProto::rand(),
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
batching_enabled: true,
wait_before_drop: duration_from_i64us(wait_before_drop),
wait_before_close: duration_from_i64us(wait_before_close),
wait_before_drop: (
duration_from_i64us(*cc_drop.wait_before_drop()),
*cc_drop.ttl(),
),
wait_before_close: (
duration_from_i64us(*cc_block.wait_before_close()),
*cc_block.ttl(),
),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down

0 comments on commit 1ca7057

Please sign in to comment.