Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement non-linear wait before drop(close) #1603

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,26 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_drop * 2 ^ ttl
ttl: 8,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
/// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message
/// if still no batch is available.
wait_before_close: 5000000,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_close * 2 ^ ttl
ttl: 2,
},
},
/// Perform batching of messages if they are smaller of the batch_size
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf {
fn default() -> Self {
Self {
wait_before_drop: 1000,
ttl: 8,
}
}
}
Expand All @@ -261,6 +262,7 @@ impl Default for CongestionControlBlockConf {
fn default() -> Self {
Self {
wait_before_close: 5000000,
ttl: 2,
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,26 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message
/// if still no batch is available.
wait_before_drop: i64,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_drop * 2 ^ ttl
ttl: usize,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
pub block: CongestionControlBlockConf {
/// The maximum time in microseconds to wait for an available batch before closing the transport session
/// when sending a blocking message if still no batch is available.
wait_before_close: i64,
/// The maximum deadline limit for multi-fragment messages.
/// When sending multi-fragment message, for each consecutive fragment the deadline from
/// the start point of message transmission is calculated as
/// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number.
/// Thus, this parameter allows setting a limitation for maximum time for multi-fragment
/// message transmission as wait_before_close * 2 ^ ttl
ttl: usize,
},
},
pub batching: BatchingConf {
Expand Down
79 changes: 56 additions & 23 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,30 @@ impl StageInMutex {
}
}

struct WaitTime {
wait_time: Duration,
ttl: usize,
}

impl WaitTime {
fn new(wait_time: Duration, ttl: usize) -> Self {
Self { wait_time, ttl }
}

fn advance(&mut self, instant: &mut Instant) {
if let Some(new_ttl) = self.ttl.checked_sub(1) {
*instant += self.wait_time;
self.ttl = new_ttl;
self.wait_time *= 2;
}
}

fn wait_time(&self) -> Duration {
self.wait_time
}
}

#[derive(Clone)]
enum DeadlineSetting {
Immediate,
Infinite,
Expand All @@ -136,37 +160,44 @@ enum DeadlineSetting {

struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<Duration>,
wait_time: Option<WaitTime>,
}

impl LazyDeadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Option<WaitTime>) -> Self {
Self {
deadline: None,
wait_time,
}
}

fn advance(&mut self) {
let wait_time = self.wait_time;
match &mut self.deadline() {
match self.deadline().to_owned() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(instant) => {
*instant = instant.add(unsafe { wait_time.unwrap_unchecked() });
DeadlineSetting::Finite(mut instant) => {
// SAFETY: this is safe because DeadlineSetting::Finite is returned by
// deadline() only if wait_time is Some(_)
let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() };
wait_time.advance(&mut instant);

self.deadline = Some(DeadlineSetting::Finite(instant));
}
}
}

#[inline]
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline.get_or_insert_with(|| match self.wait_time {
Some(wait_time) => match wait_time.is_zero() {
true => DeadlineSetting::Immediate,
false => DeadlineSetting::Finite(Instant::now().add(wait_time)),
},
None => DeadlineSetting::Infinite,
})
self.deadline
.get_or_insert_with(|| match self.wait_time.as_mut() {
Some(wait_time) => match wait_time.wait_time() {
Duration::ZERO => DeadlineSetting::Immediate,
nonzero_wait_time => {
DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time))
}
},
None => DeadlineSetting::Infinite,
})
}
}

Expand All @@ -175,9 +206,11 @@ struct Deadline {
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
fn new(wait_time: Option<(Duration, usize)>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(wait_time),
lazy_deadline: LazyDeadline::new(
wait_time.map(|(wait_time, ttl)| WaitTime::new(wait_time, ttl)),
),
}
}

Expand Down Expand Up @@ -573,8 +606,8 @@ impl StageOut {
pub(crate) struct TransmissionPipelineConf {
pub(crate) batch: BatchConfig,
pub(crate) queue_size: [usize; Priority::NUM],
pub(crate) wait_before_drop: Duration,
pub(crate) wait_before_close: Duration,
pub(crate) wait_before_drop: (Duration, usize),
pub(crate) wait_before_close: (Duration, usize),
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
}
Expand Down Expand Up @@ -676,8 +709,8 @@ pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
wait_before_drop: Duration,
wait_before_close: Duration,
wait_before_drop: (Duration, usize),
wait_before_close: (Duration, usize),
}

impl TransmissionPipelineProducer {
Expand Down Expand Up @@ -852,8 +885,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
wait_before_drop: (Duration::from_millis(1), 10),
wait_before_close: (Duration::from_secs(5), 10),
batching_time_limit: Duration::from_micros(1),
};

Expand All @@ -866,8 +899,8 @@ mod tests {
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
wait_before_drop: Duration::from_millis(1),
wait_before_close: Duration::from_secs(5),
wait_before_drop: (Duration::from_millis(1), 10),
wait_before_close: (Duration::from_secs(5), 10),
batching_time_limit: Duration::from_micros(1),
};

Expand Down
48 changes: 24 additions & 24 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ pub struct TransportManagerConfig {
pub resolution: Resolution,
pub batch_size: BatchSize,
pub batching: bool,
pub wait_before_drop: Duration,
pub wait_before_close: Duration,
pub wait_before_drop: (Duration, usize),
pub wait_before_close: (Duration, usize),
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub defrag_buff_size: usize,
Expand Down Expand Up @@ -141,8 +141,8 @@ pub struct TransportManagerBuilder {
batch_size: BatchSize,
batching_enabled: bool,
batching_time_limit: Duration,
wait_before_drop: Duration,
wait_before_close: Duration,
wait_before_drop: (Duration, usize),
wait_before_close: (Duration, usize),
queue_size: QueueSizeConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
Expand Down Expand Up @@ -192,12 +192,12 @@ impl TransportManagerBuilder {
self
}

pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self {
pub fn wait_before_drop(mut self, wait_before_drop: (Duration, usize)) -> Self {
self.wait_before_drop = wait_before_drop;
self
}

pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self {
pub fn wait_before_close(mut self, wait_before_close: (Duration, usize)) -> Self {
self.wait_before_close = wait_before_close;
self
}
Expand Down Expand Up @@ -249,6 +249,8 @@ impl TransportManagerBuilder {
}

let link = config.transport().link();
let cc_drop = link.tx().queue().congestion_control().drop();
let cc_block = link.tx().queue().congestion_control().block();
let mut resolution = Resolution::default();
resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution());
self = self.resolution(resolution);
Expand All @@ -259,21 +261,13 @@ impl TransportManagerBuilder {
));
self = self.defrag_buff_size(*link.rx().max_message_size());
self = self.link_rx_buffer_size(*link.rx().buffer_size());
self = self.wait_before_drop(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.drop()
.wait_before_drop(),
self = self.wait_before_drop((
duration_from_i64us(*cc_drop.wait_before_drop()),
*cc_drop.ttl(),
));
self = self.wait_before_close(duration_from_i64us(
*link
.tx()
.queue()
.congestion_control()
.block()
.wait_before_close(),
self = self.wait_before_close((
duration_from_i64us(*cc_block.wait_before_close()),
*cc_block.ttl(),
));
self = self.queue_size(link.tx().queue().size().clone());
self = self.tx_threads(*link.tx().threads());
Expand Down Expand Up @@ -372,17 +366,23 @@ impl Default for TransportManagerBuilder {
let link_rx = LinkRxConf::default();
let queue = QueueConf::default();
let backoff = *queue.batching().time_limit();
let wait_before_drop = *queue.congestion_control().drop().wait_before_drop();
let wait_before_close = *queue.congestion_control().block().wait_before_close();
let cc_drop = queue.congestion_control().drop();
let cc_block = queue.congestion_control().block();
Self {
version: VERSION,
zid: ZenohIdProto::rand(),
whatami: zenoh_config::defaults::mode,
resolution: Resolution::default(),
batch_size: BatchSize::MAX,
batching_enabled: true,
wait_before_drop: duration_from_i64us(wait_before_drop),
wait_before_close: duration_from_i64us(wait_before_close),
wait_before_drop: (
duration_from_i64us(*cc_drop.wait_before_drop()),
*cc_drop.ttl(),
),
wait_before_close: (
duration_from_i64us(*cc_block.wait_before_close()),
*cc_block.ttl(),
),
queue_size: queue.size,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
Expand Down