Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: xline watcher will loss event #678

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions crates/xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ use std::{
time::Duration,
};

use clippy_utilities::OverflowArithmetic;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use tokio::{
sync::mpsc::{self, error::TrySendError},
time::sleep,
};
use tracing::debug;
use tracing::{debug, warn};
use utils::{
parking_lot_lock::RwLockMap,
task_manager::{tasks::TaskName, Listener, TaskManager},
Expand Down Expand Up @@ -62,6 +60,9 @@ struct Watcher {
event_tx: mpsc::Sender<WatchEvent>,
/// Compacted flag
compacted: bool,
/// TODO: remove it when https://github.com/xline-kv/Xline/issues/491 has been closed
/// Store the revision that has been notified
notified_set: HashSet<i64>,
}

impl PartialEq for Watcher {
Expand Down Expand Up @@ -97,6 +98,7 @@ impl Watcher {
stop_notify,
event_tx,
compacted,
notified_set: HashSet::new(),
}
}

Expand All @@ -114,10 +116,10 @@ impl Watcher {
fn filter_events(&self, mut events: Vec<Event>) -> Vec<Event> {
events.retain(|event| {
self.filters.iter().all(|filter| filter != &event.r#type)
&& (event
.kv
.as_ref()
.map_or(false, |kv| kv.mod_revision >= self.start_rev))
&& (event.kv.as_ref().map_or(false, |kv| {
kv.mod_revision >= self.start_rev
&& !self.notified_set.contains(&kv.mod_revision)
}))
});
events
}
Expand All @@ -136,26 +138,26 @@ impl Watcher {
revision,
compacted: self.compacted,
};
if !self.compacted {
if revision < self.start_rev || 0 == events_len {
return Ok(());
}
debug!(watch_id, revision, events_len, "try to send watch response");
if !self.compacted
&& (revision < self.start_rev
|| self.notified_set.contains(&revision)
|| 0 == events_len)
{
return Ok(());
};

match self.event_tx.try_send(watch_event) {
Ok(_) => {
debug!(watch_id, revision, "response sent successfully");
self.start_rev = revision.overflow_add(1);
let _ignore = self.notified_set.insert(revision);
Ok(())
}
Err(TrySendError::Closed(_)) => {
debug!(watch_id, revision, "watcher is closed");
warn!(watch_id, revision, "watcher is closed");
self.stop_notify.notify(1);
Ok(())
}
Err(TrySendError::Full(watch_event)) => {
debug!(
warn!(
watch_id,
revision, "events channel is full, will try to send later"
);
Expand Down Expand Up @@ -590,7 +592,7 @@ mod test {

use std::{collections::BTreeMap, time::Duration};

use clippy_utilities::Cast;
use clippy_utilities::{Cast, OverflowArithmetic};
use test_macros::abort_on_panic;
use tokio::time::{sleep, timeout};
use utils::config::EngineConfig;
Expand Down
Loading