Skip to content

Commit

Permalink
Merge branch 'master' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
GFX9 authored Mar 19, 2024
2 parents dbdb714 + e56a82d commit d3f6b87
Show file tree
Hide file tree
Showing 15 changed files with 399 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ impl std::fmt::Debug for FilePipeline {

#[cfg(test)]
mod tests {
use crate::server::storage::wal::util::get_file_paths_with_ext;

use super::*;
use crate::server::storage::wal::util::get_file_paths_with_ext;

#[tokio::test]
async fn file_pipeline_is_ok() {
Expand Down
6 changes: 2 additions & 4 deletions crates/curp/src/server/storage/wal/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use tokio::{
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;

use crate::log_entry::LogEntry;

use super::{
codec::{DataFrame, WAL},
error::{CorruptType, WALError},
util::{get_checksum, parse_u64, validate_data, LockedFile},
WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION,
};
use crate::log_entry::LogEntry;

/// The size of wal file header in bytes
const WAL_HEADER_SIZE: usize = 56;
Expand Down Expand Up @@ -415,9 +414,8 @@ mod tests {

use curp_test_utils::test_cmd::TestCommand;

use crate::log_entry::EntryData;

use super::*;
use crate::log_entry::EntryData;

#[tokio::test]
async fn segment_state_transition_is_correct() {
Expand Down
6 changes: 4 additions & 2 deletions crates/curp/src/tracker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![allow(unused)] // TODO remove when used

use std::collections::VecDeque;
use std::ops::{AddAssign, Sub};
use std::{
collections::VecDeque,
ops::{AddAssign, Sub},
};

use clippy_utilities::NumericCast;

Expand Down
20 changes: 20 additions & 0 deletions crates/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,26 @@ pub mod tracing;
use ::tracing::debug;
pub use parser::*;

/// display all elements for the given vector
#[macro_export]
macro_rules! write_vec {
($f:expr, $name:expr, $vector:expr) => {
write!($f, "{}: [ ", { $name })?;
let last_idx = if $vector.is_empty() {
0
} else {
$vector.len() - 1
};
for (idx, element) in $vector.iter().enumerate() {
write!($f, "{}", element)?;
if idx != last_idx {
write!($f, ",")?;
}
}
write!($f, "]")?;
};
}

/// Get current timestamp in seconds
#[must_use]
#[inline]
Expand Down
12 changes: 9 additions & 3 deletions crates/xline/src/server/auth_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ where
&self,
mut request: tonic::Request<AuthUserAddRequest>,
) -> Result<tonic::Response<AuthUserAddResponse>, tonic::Status> {
debug!("Receive AuthUserAddRequest {:?}", request);
let user_add_req = request.get_mut();
debug!("Receive AuthUserAddRequest {}", user_add_req);
user_add_req.validation()?;
let hashed_password = Self::hash_password(user_add_req.password.as_bytes());
user_add_req.hashed_password = hashed_password;
Expand Down Expand Up @@ -244,7 +244,10 @@ where
&self,
request: tonic::Request<AuthRoleGrantPermissionRequest>,
) -> Result<tonic::Response<AuthRoleGrantPermissionResponse>, tonic::Status> {
debug!("Receive AuthRoleGrantPermissionRequest {:?}", request);
debug!(
"Receive AuthRoleGrantPermissionRequest {}",
request.get_ref()
);
request.get_ref().validation()?;
self.handle_req(request, false).await
}
Expand All @@ -253,7 +256,10 @@ where
&self,
request: tonic::Request<AuthRoleRevokePermissionRequest>,
) -> Result<tonic::Response<AuthRoleRevokePermissionResponse>, tonic::Status> {
debug!("Receive AuthRoleRevokePermissionRequest {:?}", request);
debug!(
"Receive AuthRoleRevokePermissionRequest {}",
request.get_ref()
);
self.handle_req(request, false).await
}
}
14 changes: 7 additions & 7 deletions crates/xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ where
) -> Result<tonic::Response<RangeResponse>, tonic::Status> {
let range_req = request.get_ref();
range_req.validation()?;
debug!("Receive grpc request: {:?}", range_req);
debug!("Receive grpc request: {}", range_req);
range_req.check_revision(
self.kv_storage.compacted_revision(),
self.kv_storage.revision(),
Expand Down Expand Up @@ -244,7 +244,7 @@ where
) -> Result<tonic::Response<PutResponse>, tonic::Status> {
let put_req: &PutRequest = request.get_ref();
put_req.validation()?;
debug!("Receive grpc request: {:?}", put_req);
debug!("Receive grpc request: {}", put_req);
let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?;
let is_fast_path = true;
let (cmd_res, sync_res) = self
Expand All @@ -253,7 +253,7 @@ where
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
debug!("Get revision {:?} for PutRequest", revision);
debug!("Get revision {} for PutRequest", revision);
Self::update_header_revision(&mut res, revision);
}
if let Response::ResponsePut(response) = res {
Expand All @@ -273,7 +273,7 @@ where
) -> Result<tonic::Response<DeleteRangeResponse>, tonic::Status> {
let delete_range_req = request.get_ref();
delete_range_req.validation()?;
debug!("Receive grpc request: {:?}", delete_range_req);
debug!("Receive grpc request: {}", delete_range_req);
let auth_info = self.auth_storage.try_get_auth_info_from_request(&request)?;
let is_fast_path = true;
let (cmd_res, sync_res) = self
Expand All @@ -282,7 +282,7 @@ where
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
debug!("Get revision {:?} for DeleteRangeRequest", revision);
debug!("Get revision {} for DeleteRangeRequest", revision);
Self::update_header_revision(&mut res, revision);
}
if let Response::ResponseDeleteRange(response) = res {
Expand All @@ -303,7 +303,7 @@ where
) -> Result<tonic::Response<TxnResponse>, tonic::Status> {
let txn_req = request.get_ref();
txn_req.validation()?;
debug!("Receive grpc request: {:?}", txn_req);
debug!("Receive grpc request: {}", txn_req);
txn_req.check_revision(
self.kv_storage.compacted_revision(),
self.kv_storage.revision(),
Expand All @@ -326,7 +326,7 @@ where
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
let revision = sync_res.revision();
debug!("Get revision {:?} for TxnRequest", revision);
debug!("Get revision {} for TxnRequest", revision);
Self::update_header_revision(&mut res, revision);
}
res
Expand Down
3 changes: 1 addition & 2 deletions crates/xline/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ mod watch_server;
/// Xline server
mod xline_server;

pub(crate) use self::auth_server::get_token;
pub(crate) use self::maintenance::MAINTENANCE_SNAPSHOT_CHUNK_SIZE;
pub use self::xline_server::XlineServer;
pub(crate) use self::{auth_server::get_token, maintenance::MAINTENANCE_SNAPSHOT_CHUNK_SIZE};
50 changes: 32 additions & 18 deletions crates/xline/src/storage/kvwatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ use std::{
time::Duration,
};

use clippy_utilities::OverflowArithmetic;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use tokio::{
sync::mpsc::{self, error::TrySendError},
time::sleep,
};
use tracing::debug;
use tracing::{debug, warn};
use utils::{
parking_lot_lock::RwLockMap,
task_manager::{tasks::TaskName, Listener, TaskManager},
write_vec,
};
use xlineapi::command::KeyRange;

Expand Down Expand Up @@ -62,6 +61,9 @@ struct Watcher {
event_tx: mpsc::Sender<WatchEvent>,
/// Compacted flag
compacted: bool,
/// TODO: remove it when https://github.com/xline-kv/Xline/issues/491 has been closed
/// Store the revision that has been notified
notified_set: HashSet<i64>,
}

impl PartialEq for Watcher {
Expand Down Expand Up @@ -97,6 +99,7 @@ impl Watcher {
stop_notify,
event_tx,
compacted,
notified_set: HashSet::new(),
}
}

Expand All @@ -114,10 +117,10 @@ impl Watcher {
fn filter_events(&self, mut events: Vec<Event>) -> Vec<Event> {
events.retain(|event| {
self.filters.iter().all(|filter| filter != &event.r#type)
&& (event
.kv
.as_ref()
.map_or(false, |kv| kv.mod_revision >= self.start_rev))
&& (event.kv.as_ref().map_or(false, |kv| {
kv.mod_revision >= self.start_rev
&& !self.notified_set.contains(&kv.mod_revision)
}))
});
events
}
Expand All @@ -136,26 +139,26 @@ impl Watcher {
revision,
compacted: self.compacted,
};
if !self.compacted {
if revision < self.start_rev || 0 == events_len {
return Ok(());
}
debug!(watch_id, revision, events_len, "try to send watch response");
if !self.compacted
&& (revision < self.start_rev
|| self.notified_set.contains(&revision)
|| 0 == events_len)
{
return Ok(());
};

match self.event_tx.try_send(watch_event) {
Ok(_) => {
debug!(watch_id, revision, "response sent successfully");
self.start_rev = revision.overflow_add(1);
let _ignore = self.notified_set.insert(revision);
Ok(())
}
Err(TrySendError::Closed(_)) => {
debug!(watch_id, revision, "watcher is closed");
warn!(watch_id, revision, "watcher is closed");
self.stop_notify.notify(1);
Ok(())
}
Err(TrySendError::Full(watch_event)) => {
debug!(
warn!(
watch_id,
revision, "events channel is full, will try to send later"
);
Expand Down Expand Up @@ -540,7 +543,6 @@ where
}

/// Watch Event
#[derive(Debug)]
pub(crate) struct WatchEvent {
/// Watch ID
id: WatchId,
Expand All @@ -552,6 +554,18 @@ pub(crate) struct WatchEvent {
compacted: bool,
}

impl std::fmt::Debug for WatchEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"WatchEvent {{ id: {}, revision: {}, compacted: {}, ",
self.id, self.revision, self.compacted,
)?;
write_vec!(f, "events", self.events);
write!(f, " }}")
}
}

impl WatchEvent {
/// Get revision
pub(crate) fn revision(&self) -> i64 {
Expand Down Expand Up @@ -590,7 +604,7 @@ mod test {

use std::{collections::BTreeMap, time::Duration};

use clippy_utilities::Cast;
use clippy_utilities::{Cast, OverflowArithmetic};
use test_macros::abort_on_panic;
use tokio::time::{sleep, timeout};
use utils::config::EngineConfig;
Expand Down
3 changes: 1 addition & 2 deletions crates/xline/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ pub(crate) mod revision;
/// Persistent storage abstraction
pub(crate) mod storage_api;

pub use self::revision::Revision;
pub(crate) use self::{
alarm_store::AlarmStore, auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore,
};

pub use self::revision::Revision;
4 changes: 2 additions & 2 deletions crates/xline/src/utils/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ pub struct ServerArgs {
#[clap(long)]
name: String,
/// Node peer listen urls
#[clap(long, num_args = 1.., value_delimiter = ',')]
#[clap(long, required = true, num_args = 1.., value_delimiter = ',')]
peer_listen_urls: Vec<String>,
/// Node peer advertise urls
#[clap(long, num_args = 1.., value_delimiter = ',')]
peer_advertise_urls: Vec<String>,
/// Node client listen urls
#[clap(long, num_args = 1.., value_delimiter = ',')]
#[clap(long, required = true, num_args = 1.., value_delimiter = ',')]
client_listen_urls: Vec<String>,
/// Node client advertise urls
#[clap(long, num_args = 1.., value_delimiter = ',')]
Expand Down
1 change: 1 addition & 0 deletions crates/xlineapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ prost = "0.12.3"
serde = { version = "1.0.137", features = ["derive"] }
thiserror = "1.0.37"
tonic = { version = "0.4.1", package = "madsim-tonic" }
utils = { path = "../utils", features = ["parking_lot"] }
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

[build-dependencies]
Expand Down
Loading

0 comments on commit d3f6b87

Please sign in to comment.