diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 6e80b0756..dc008019f 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -8,15 +8,13 @@ use std::{ time::Duration, }; -use clippy_utilities::OverflowArithmetic; use itertools::Itertools; -use log::warn; use parking_lot::RwLock; use tokio::{ sync::mpsc::{self, error::TrySendError}, time::sleep, }; -use tracing::debug; +use tracing::{debug, warn}; use utils::{ parking_lot_lock::RwLockMap, task_manager::{tasks::TaskName, Listener, TaskManager}, @@ -63,6 +61,9 @@ struct Watcher { event_tx: mpsc::Sender, /// Compacted flag compacted: bool, + /// TODO: remove it when https://github.com/xline-kv/Xline/issues/491 has been closed + /// Store the revision that has been notified + notified_set: HashSet, } impl PartialEq for Watcher { @@ -98,6 +99,7 @@ impl Watcher { stop_notify, event_tx, compacted, + notified_set: HashSet::new(), } } @@ -115,10 +117,10 @@ impl Watcher { fn filter_events(&self, mut events: Vec) -> Vec { events.retain(|event| { self.filters.iter().all(|filter| filter != &event.r#type) - && (event - .kv - .as_ref() - .map_or(false, |kv| kv.mod_revision >= self.start_rev)) + && (event.kv.as_ref().map_or(false, |kv| { + kv.mod_revision >= self.start_rev + && !self.notified_set.contains(&kv.mod_revision) + })) }); events } @@ -137,26 +139,26 @@ impl Watcher { revision, compacted: self.compacted, }; - if !self.compacted { - if revision < self.start_rev || 0 == events_len { - return Ok(()); - } - debug!(watch_id, revision, events_len, "try to send watch response"); + if !self.compacted + && (revision < self.start_rev + || self.notified_set.contains(&revision) + || 0 == events_len) + { + return Ok(()); }; match self.event_tx.try_send(watch_event) { Ok(_) => { - debug!(watch_id, revision, "response sent successfully"); - self.start_rev = revision.overflow_add(1); + let _ignore = self.notified_set.insert(revision); Ok(()) } Err(TrySendError::Closed(_)) => { - debug!(watch_id, revision, "watcher is closed"); + warn!(watch_id, revision, "watcher is closed"); self.stop_notify.notify(1); Ok(()) } Err(TrySendError::Full(watch_event)) => { - debug!( + warn!( watch_id, revision, "events channel is full, will try to send later" ); @@ -602,7 +604,7 @@ mod test { use std::{collections::BTreeMap, time::Duration}; - use clippy_utilities::Cast; + use clippy_utilities::{Cast, OverflowArithmetic}; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; use utils::config::EngineConfig;