Skip to content

Commit

Permalink
fix: xline watcher will loss event
Browse files Browse the repository at this point in the history
Closes: #677
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 authored and mergify[bot] committed Mar 13, 2024
1 parent d0c8061 commit 72f3cf9
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions crates/xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ use std::{
time::Duration,
};

use clippy_utilities::OverflowArithmetic;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use tokio::{
sync::mpsc::{self, error::TrySendError},
time::sleep,
};
use tracing::debug;
use tracing::{debug, warn};
use utils::{
parking_lot_lock::RwLockMap,
task_manager::{tasks::TaskName, Listener, TaskManager},
Expand Down Expand Up @@ -63,6 +61,9 @@ struct Watcher {
event_tx: mpsc::Sender<WatchEvent>,
/// Compacted flag
compacted: bool,
/// TODO: remove it when https://github.com/xline-kv/Xline/issues/491 has been closed
/// Store the revision that has been notified
notified_set: HashSet<i64>,
}

impl PartialEq for Watcher {
Expand Down Expand Up @@ -98,6 +99,7 @@ impl Watcher {
stop_notify,
event_tx,
compacted,
notified_set: HashSet::new(),
}
}

Expand All @@ -115,10 +117,10 @@ impl Watcher {
fn filter_events(&self, mut events: Vec<Event>) -> Vec<Event> {
events.retain(|event| {
self.filters.iter().all(|filter| filter != &event.r#type)
&& (event
.kv
.as_ref()
.map_or(false, |kv| kv.mod_revision >= self.start_rev))
&& (event.kv.as_ref().map_or(false, |kv| {
kv.mod_revision >= self.start_rev
&& !self.notified_set.contains(&kv.mod_revision)
}))
});
events
}
Expand All @@ -137,26 +139,26 @@ impl Watcher {
revision,
compacted: self.compacted,
};
if !self.compacted {
if revision < self.start_rev || 0 == events_len {
return Ok(());
}
debug!(watch_id, revision, events_len, "try to send watch response");
if !self.compacted
&& (revision < self.start_rev
|| self.notified_set.contains(&revision)
|| 0 == events_len)
{
return Ok(());
};

match self.event_tx.try_send(watch_event) {
Ok(_) => {
debug!(watch_id, revision, "response sent successfully");
self.start_rev = revision.overflow_add(1);
let _ignore = self.notified_set.insert(revision);
Ok(())
}
Err(TrySendError::Closed(_)) => {
debug!(watch_id, revision, "watcher is closed");
warn!(watch_id, revision, "watcher is closed");
self.stop_notify.notify(1);
Ok(())
}
Err(TrySendError::Full(watch_event)) => {
debug!(
warn!(
watch_id,
revision, "events channel is full, will try to send later"
);
Expand Down Expand Up @@ -602,7 +604,7 @@ mod test {

use std::{collections::BTreeMap, time::Duration};

use clippy_utilities::Cast;
use clippy_utilities::{Cast, OverflowArithmetic};
use test_macros::abort_on_panic;
use tokio::time::{sleep, timeout};
use utils::config::EngineConfig;
Expand Down

0 comments on commit 72f3cf9

Please sign in to comment.