From b9202df99fd62be74f7fb628d4cf0e8a448ec6e4 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Thu, 7 Mar 2024 21:43:13 +0800 Subject: [PATCH] fix: xline watcher will loss event Closes: #677 Signed-off-by: Phoeniix Zhao --- crates/xline/src/storage/kvwatcher.rs | 39 +++++++++++++++------------ 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 174ee0a9e..b4a81c121 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -8,15 +8,13 @@ use std::{ time::Duration, }; -use clippy_utilities::OverflowArithmetic; use itertools::Itertools; -use log::warn; use parking_lot::RwLock; use tokio::{ sync::mpsc::{self, error::TrySendError}, time::sleep, }; -use tracing::debug; +use tracing::{debug, warn}; use utils::{ parking_lot_lock::RwLockMap, task_manager::{tasks::TaskName, Listener, TaskManager}, @@ -62,6 +60,9 @@ struct Watcher { event_tx: mpsc::Sender, /// Compacted flag compacted: bool, + /// TODO: remove it when https://github.com/xline-kv/Xline/issues/491 has been closed + /// Store the revision that has been notified + notified_set: HashSet, } impl PartialEq for Watcher { @@ -97,6 +98,7 @@ impl Watcher { stop_notify, event_tx, compacted, + notified_set: HashSet::new(), } } @@ -114,10 +116,10 @@ impl Watcher { fn filter_events(&self, mut events: Vec) -> Vec { events.retain(|event| { self.filters.iter().all(|filter| filter != &event.r#type) - && (event - .kv - .as_ref() - .map_or(false, |kv| kv.mod_revision >= self.start_rev)) + && (event.kv.as_ref().map_or(false, |kv| { + kv.mod_revision >= self.start_rev + && !self.notified_set.contains(&kv.mod_revision) + })) }); events } @@ -136,26 +138,29 @@ impl Watcher { revision, compacted: self.compacted, }; - if !self.compacted { - if revision < self.start_rev || 0 == events_len { - return Ok(()); - } - debug!(watch_id, revision, events_len, "try to send watch response"); + if !self.compacted + && (revision < self.start_rev + || self.notified_set.contains(&revision) + || 0 == events_len) + { + return Ok(()); }; match self.event_tx.try_send(watch_event) { Ok(_) => { - debug!(watch_id, revision, "response sent successfully"); - self.start_rev = revision.overflow_add(1); + assert!( + self.notified_set.insert(revision), + "should not notify the same revision twice!!" + ); Ok(()) } Err(TrySendError::Closed(_)) => { - debug!(watch_id, revision, "watcher is closed"); + warn!(watch_id, revision, "watcher is closed"); self.stop_notify.notify(1); Ok(()) } Err(TrySendError::Full(watch_event)) => { - debug!( + warn!( watch_id, revision, "events channel is full, will try to send later" ); @@ -590,7 +595,7 @@ mod test { use std::{collections::BTreeMap, time::Duration}; - use clippy_utilities::Cast; + use clippy_utilities::{Cast, OverflowArithmetic}; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; use utils::config::EngineConfig;