From a6d34498856a9877697a2e858c069d79062db566 Mon Sep 17 00:00:00 2001 From: lxl66566 Date: Mon, 4 Nov 2024 14:29:31 +0800 Subject: [PATCH] refactor(xlineapi): use function instead of convertion; use thread_local Signed-off-by: lxl66566 --- crates/xline/src/server/command.rs | 48 ++++--- crates/xlineapi/src/classifier.rs | 214 +++++++++++++---------------- crates/xlineapi/src/command.rs | 83 +++++------ crates/xlineapi/src/lib.rs | 11 +- 4 files changed, 153 insertions(+), 203 deletions(-) diff --git a/crates/xline/src/server/command.rs b/crates/xline/src/server/command.rs index cd564729d..dc201ff8e 100644 --- a/crates/xline/src/server/command.rs +++ b/crates/xline/src/server/command.rs @@ -15,6 +15,7 @@ use parking_lot::RwLock; use tracing::warn; use utils::{barrier::IdBarrier, table_names::META_TABLE}; use xlineapi::{ + classifier::RequestClassifier, command::{Command, CurpClient, SyncResponse}, execute_error::ExecuteError, AlarmAction, AlarmRequest, AlarmType, @@ -22,7 +23,7 @@ use xlineapi::{ use crate::{ revision_number::RevisionNumberGeneratorState, - rpc::{RequestBackend, RequestWrapper}, + rpc::RequestWrapper, storage::{ db::{WriteOp, DB}, index::IndexOperate, @@ -315,22 +316,22 @@ impl CommandExecutor { I: IndexOperate, { let er = to_execute - .then(|| match wrapper.backend() { - RequestBackend::Auth => self.auth_storage.execute(wrapper), - RequestBackend::Lease => self.lease_storage.execute(wrapper), - RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), - RequestBackend::Kv => unreachable!("Should not execute kv commands"), + .then(|| match wrapper { + x if x.is_auth_backend() => self.auth_storage.execute(wrapper), + x if x.is_lease_backend() => self.lease_storage.execute(wrapper), + x if x.is_alarm_backend() => Ok(self.alarm_storage.execute(wrapper)), + _ => unreachable!("Should not execute kv commands"), }) .transpose()?; - let (asr, wr_ops) = match wrapper.backend() { - RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?, - RequestBackend::Lease => { + let (asr, wr_ops) = match wrapper { + x if x.is_auth_backend() => self.auth_storage.after_sync(wrapper, auth_revision)?, + x if x.is_lease_backend() => { self.lease_storage .after_sync(wrapper, general_revision, txn_db, index)? } - RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision), - RequestBackend::Kv => unreachable!("Should not sync kv commands"), + x if x.is_alarm_backend() => self.alarm_storage.after_sync(wrapper, general_revision), + _ => unreachable!("Should not sync kv commands"), }; txn_db.write_ops(wr_ops)?; @@ -421,11 +422,12 @@ impl CurpCommandExecutor for CommandExecutor { let auth_info = cmd.auth_info(); let wrapper = cmd.request(); self.auth_storage.check_permission(wrapper, auth_info)?; - match wrapper.backend() { - RequestBackend::Kv => self.kv_storage.execute(wrapper, None), - RequestBackend::Auth => self.auth_storage.execute(wrapper), - RequestBackend::Lease => self.lease_storage.execute(wrapper), - RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)), + match &wrapper { + x if x.is_kv_backend() => self.kv_storage.execute(wrapper, None), + x if x.is_auth_backend() => self.auth_storage.execute(wrapper), + x if x.is_lease_backend() => self.lease_storage.execute(wrapper), + x if x.is_alarm_backend() => Ok(self.alarm_storage.execute(wrapper)), + _ => unreachable!("Must be one of kv, auth, lease, alarm"), } } @@ -438,11 +440,12 @@ impl CurpCommandExecutor for CommandExecutor { > { let er = self.execute(cmd)?; let wrapper = cmd.request(); - let rev = match wrapper.backend() { - RequestBackend::Kv | RequestBackend::Lease | RequestBackend::Alarm => { + let rev = match wrapper { + x if x.is_auth_backend() => self.auth_storage.revision_gen().get(), + x if (x.is_kv_backend() || x.is_lease_backend() || x.is_alarm_backend()) => { self.kv_storage.revision_gen().get() } - RequestBackend::Auth => self.auth_storage.revision_gen().get(), + _ => unreachable!("Must be one of kv, auth, lease, alarm"), }; Ok((er, SyncResponse::new(rev))) } @@ -484,15 +487,15 @@ impl CurpCommandExecutor for CommandExecutor { states.update_result(|c| { let (cmd, to_execute) = c.into_parts(); let wrapper = cmd.request(); - let (asr, er) = match wrapper.backend() { - RequestBackend::Kv => self.after_sync_kv( + let (asr, er) = match wrapper { + x if x.is_kv_backend() => self.after_sync_kv( wrapper, &txn_db, &index_state, &general_revision_state, to_execute, ), - RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => self + x if x.is_auth_backend() || x.is_lease_backend() || x.is_alarm_backend() => self .after_sync_others( wrapper, &txn_db, @@ -501,6 +504,7 @@ impl CurpCommandExecutor for CommandExecutor { &auth_revision_state, to_execute, ), + _ => unreachable!("Must be one of kv, auth, lease, alarm"), }?; if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper { diff --git a/crates/xlineapi/src/classifier.rs b/crates/xlineapi/src/classifier.rs index e7e5d0c1d..89ab66eca 100644 --- a/crates/xlineapi/src/classifier.rs +++ b/crates/xlineapi/src/classifier.rs @@ -1,5 +1,18 @@ use super::RequestWrapper; +pub trait RequestClassifier { + fn is_kv_backend(&self) -> bool; + fn is_lease_backend(&self) -> bool; + fn is_auth_backend(&self) -> bool; + fn is_alarm_backend(&self) -> bool; + fn is_put(&self) -> bool; + fn is_compaction(&self) -> bool; + fn is_txn(&self) -> bool; + fn is_range(&self) -> bool; + fn is_read_only(&self) -> bool; + fn is_write(&self) -> bool; +} + /// because RequestWrapper do not repr u8, we need to convert it manually. impl From<&RequestWrapper> for u8 { fn from(value: &RequestWrapper) -> Self { @@ -34,138 +47,95 @@ impl From<&RequestWrapper> for u8 { } } -/// Backend store of request -#[allow(missing_docs)] -#[derive(Debug, PartialEq, Eq)] -pub enum RequestBackend { - Kv, - Auth, - Lease, - Alarm, -} - -impl From<&RequestWrapper> for RequestBackend { - fn from(value: &RequestWrapper) -> Self { - match *value { +impl RequestClassifier for RequestWrapper { + #[inline] + fn is_kv_backend(&self) -> bool { + matches!( + *self, RequestWrapper::PutRequest(_) - | RequestWrapper::RangeRequest(_) - | RequestWrapper::DeleteRangeRequest(_) - | RequestWrapper::TxnRequest(_) - | RequestWrapper::CompactionRequest(_) => RequestBackend::Kv, - RequestWrapper::AuthEnableRequest(_) - | RequestWrapper::AuthDisableRequest(_) - | RequestWrapper::AuthStatusRequest(_) - | RequestWrapper::AuthRoleAddRequest(_) - | RequestWrapper::AuthRoleDeleteRequest(_) - | RequestWrapper::AuthRoleGetRequest(_) - | RequestWrapper::AuthRoleGrantPermissionRequest(_) - | RequestWrapper::AuthRoleListRequest(_) - | RequestWrapper::AuthRoleRevokePermissionRequest(_) - | RequestWrapper::AuthUserAddRequest(_) - | RequestWrapper::AuthUserChangePasswordRequest(_) - | RequestWrapper::AuthUserDeleteRequest(_) - | RequestWrapper::AuthUserGetRequest(_) - | RequestWrapper::AuthUserGrantRoleRequest(_) - | RequestWrapper::AuthUserListRequest(_) - | RequestWrapper::AuthUserRevokeRoleRequest(_) - | RequestWrapper::AuthenticateRequest(_) => RequestBackend::Auth, - RequestWrapper::LeaseGrantRequest(_) - | RequestWrapper::LeaseRevokeRequest(_) - | RequestWrapper::LeaseLeasesRequest(_) => RequestBackend::Lease, - RequestWrapper::AlarmRequest(_) => RequestBackend::Alarm, - } + | RequestWrapper::RangeRequest(_) + | RequestWrapper::DeleteRangeRequest(_) + | RequestWrapper::TxnRequest(_) + | RequestWrapper::CompactionRequest(_) + ) } -} - -/// Type of request. This is the extending of [`RequestBackend`]. -#[allow(missing_docs)] -#[derive(Debug, PartialEq, Eq)] -pub enum RequestType { - Put, - Compaction, - Txn, - Range, - Auth, - Lease, - Alarm, -} - -impl From<&RequestWrapper> for RequestType { - fn from(value: &RequestWrapper) -> Self { - match *value { - RequestWrapper::PutRequest(_) => RequestType::Put, - RequestWrapper::RangeRequest(_) | RequestWrapper::DeleteRangeRequest(_) => { - RequestType::Range - } - RequestWrapper::TxnRequest(_) => RequestType::Txn, - RequestWrapper::CompactionRequest(_) => RequestType::Compaction, + #[inline] + fn is_auth_backend(&self) -> bool { + matches!( + self, RequestWrapper::AuthEnableRequest(_) - | RequestWrapper::AuthDisableRequest(_) - | RequestWrapper::AuthStatusRequest(_) - | RequestWrapper::AuthRoleAddRequest(_) - | RequestWrapper::AuthRoleDeleteRequest(_) - | RequestWrapper::AuthRoleGetRequest(_) - | RequestWrapper::AuthRoleGrantPermissionRequest(_) - | RequestWrapper::AuthRoleListRequest(_) - | RequestWrapper::AuthRoleRevokePermissionRequest(_) - | RequestWrapper::AuthUserAddRequest(_) - | RequestWrapper::AuthUserChangePasswordRequest(_) - | RequestWrapper::AuthUserDeleteRequest(_) - | RequestWrapper::AuthUserGetRequest(_) - | RequestWrapper::AuthUserGrantRoleRequest(_) - | RequestWrapper::AuthUserListRequest(_) - | RequestWrapper::AuthUserRevokeRoleRequest(_) - | RequestWrapper::AuthenticateRequest(_) => RequestType::Auth, + | RequestWrapper::AuthDisableRequest(_) + | RequestWrapper::AuthStatusRequest(_) + | RequestWrapper::AuthRoleAddRequest(_) + | RequestWrapper::AuthRoleDeleteRequest(_) + | RequestWrapper::AuthRoleGetRequest(_) + | RequestWrapper::AuthRoleGrantPermissionRequest(_) + | RequestWrapper::AuthRoleListRequest(_) + | RequestWrapper::AuthRoleRevokePermissionRequest(_) + | RequestWrapper::AuthUserAddRequest(_) + | RequestWrapper::AuthUserChangePasswordRequest(_) + | RequestWrapper::AuthUserDeleteRequest(_) + | RequestWrapper::AuthUserGetRequest(_) + | RequestWrapper::AuthUserGrantRoleRequest(_) + | RequestWrapper::AuthUserListRequest(_) + | RequestWrapper::AuthUserRevokeRoleRequest(_) + | RequestWrapper::AuthenticateRequest(_) + ) + } + #[inline] + fn is_lease_backend(&self) -> bool { + matches!( + self, RequestWrapper::LeaseGrantRequest(_) - | RequestWrapper::LeaseRevokeRequest(_) - | RequestWrapper::LeaseLeasesRequest(_) => RequestType::Lease, - RequestWrapper::AlarmRequest(_) => RequestType::Alarm, - } + | RequestWrapper::LeaseRevokeRequest(_) + | RequestWrapper::LeaseLeasesRequest(_) + ) + } + #[inline] + fn is_alarm_backend(&self) -> bool { + matches!(self, RequestWrapper::AlarmRequest(_)) + } + #[inline] + fn is_put(&self) -> bool { + matches!(self, RequestWrapper::PutRequest(_)) + } + #[inline] + fn is_range(&self) -> bool { + matches!( + self, + RequestWrapper::RangeRequest(_) | RequestWrapper::DeleteRangeRequest(_) + ) + } + #[inline] + fn is_txn(&self) -> bool { + matches!(self, RequestWrapper::TxnRequest(_)) + } + #[inline] + fn is_compaction(&self) -> bool { + matches!(self, RequestWrapper::CompactionRequest(_)) } -} -/// indicates if the request is readonly or write -#[derive(Debug, PartialEq, Eq)] -pub enum RequestRw { + #[inline] /// Read only request - Read, + fn is_read_only(&self) -> bool { + matches!( + self, + RequestWrapper::RangeRequest(_) + | RequestWrapper::AuthStatusRequest(_) + | RequestWrapper::AuthRoleGetRequest(_) + | RequestWrapper::AuthRoleListRequest(_) + | RequestWrapper::AuthUserGetRequest(_) + | RequestWrapper::AuthUserListRequest(_) + | RequestWrapper::LeaseLeasesRequest(_) + ) + } + + #[inline] /// Write request. /// /// NOTE: A `TxnRequest` or a `DeleteRangeRequest` might be read-only, but we /// assume they will mutate the state machine to simplify the implementation. - Write, -} - -impl From<&RequestWrapper> for RequestRw { - fn from(value: &RequestWrapper) -> Self { - match *value { - RequestWrapper::RangeRequest(_) - | RequestWrapper::AuthStatusRequest(_) - | RequestWrapper::AuthRoleGetRequest(_) - | RequestWrapper::AuthRoleListRequest(_) - | RequestWrapper::AuthUserGetRequest(_) - | RequestWrapper::AuthUserListRequest(_) - | RequestWrapper::LeaseLeasesRequest(_) => Self::Read, - - RequestWrapper::PutRequest(_) - | RequestWrapper::DeleteRangeRequest(_) - | RequestWrapper::TxnRequest(_) - | RequestWrapper::CompactionRequest(_) - | RequestWrapper::AuthEnableRequest(_) - | RequestWrapper::AuthDisableRequest(_) - | RequestWrapper::AuthRoleAddRequest(_) - | RequestWrapper::AuthRoleDeleteRequest(_) - | RequestWrapper::AuthRoleGrantPermissionRequest(_) - | RequestWrapper::AuthRoleRevokePermissionRequest(_) - | RequestWrapper::AuthUserAddRequest(_) - | RequestWrapper::AuthUserChangePasswordRequest(_) - | RequestWrapper::AuthUserDeleteRequest(_) - | RequestWrapper::AuthUserGrantRoleRequest(_) - | RequestWrapper::AuthUserRevokeRoleRequest(_) - | RequestWrapper::AuthenticateRequest(_) - | RequestWrapper::LeaseGrantRequest(_) - | RequestWrapper::LeaseRevokeRequest(_) - | RequestWrapper::AlarmRequest(_) => Self::Write, - } + fn is_write(&self) -> bool { + !self.is_read_only() } } diff --git a/crates/xlineapi/src/command.rs b/crates/xlineapi/src/command.rs index 2ee76224a..c34c92ba9 100644 --- a/crates/xlineapi/src/command.rs +++ b/crates/xlineapi/src/command.rs @@ -1,8 +1,7 @@ -use once_cell::sync::Lazy; use std::{ + cell::RefCell, collections::{HashMap, HashSet}, ops::{Bound, RangeBounds}, - sync::RwLock, }; use curp::{client::ClientApi, cmd::Command as CurpCommand}; @@ -12,10 +11,8 @@ use prost::Message; use serde::{Deserialize, Serialize}; use crate::{ - classifier::{RequestBackend, RequestRw, RequestType}, - execute_error::ExecuteError, - AuthInfo, PbCommand, PbCommandResponse, PbKeyRange, PbSyncResponse, RequestWrapper, - ResponseWrapper, + classifier::RequestClassifier, execute_error::ExecuteError, AuthInfo, PbCommand, + PbCommandResponse, PbKeyRange, PbSyncResponse, RequestWrapper, ResponseWrapper, }; /// The curp client trait object on the command of xline @@ -28,9 +25,10 @@ const UNBOUNDED: &[u8] = &[0_u8]; /// Range end to get one key const ONE_KEY: &[u8] = &[]; -/// A global cache to store conflict rules. The rules will not change, so it's safe to use a global cache. -static CONFLICT_RULES_CACHE: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); +thread_local! { + /// A global cache to store conflict rules. The rules will not change, so it's safe to use a global cache. + static CONFLICT_RULES_CACHE: RefCell> = RefCell::new(HashMap::new()); +} /// Key Range for Command #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] @@ -239,29 +237,28 @@ pub struct Command { /// # Example /// /// ```ignore -/// match_all!(Class1::Tag1 & Class2::Tag2)(x) +/// match_all!(is_auth_backend & is_put)(x) /// ``` macro_rules! match_all { - ($($cls:ident :: $tag:ident)&*) => { - |_x| $(matches!($cls::from(_x), $cls::$tag))&&* + ($($func:ident)&*) => { + |_x: &RequestWrapper| $(_x.$func())&&* }; } -pub(crate) use match_all; // used by lib.rs /// swapable match, returns a `Fn(x, y) -> bool` indicates the match result. /// /// # Returns /// -/// Returns `Fn(x, y) -> true` if *x match Class1 and y match Class2*, or *x match Class2 and y match Class1*. +/// Returns `Fn(x, y) -> true` if *x match Classifier1 and y match Classifier2*, +/// or *x match Classifier2 and y match Classifier1*. macro_rules! swap_match { - - ($($cls1:ident::$tag1:ident)&*, _) => {{ - |_x, _| match_all!($($cls1::$tag1)&*)(_x) + ($($func:ident)&*, _) => {{ + |_x, _| match_all!($($func)&*)(_x) }}; - ($($cls1:ident :: $tag1:ident)&*, $($cls2:ident :: $tag2:ident)&*) => { + ($($func1:ident)&*, $($func2:ident)&*) => { |_x, _y| { - (match_all!($($cls1::$tag1)&*)(_x) && match_all!($($cls2::$tag2)&*)(_y)) || ( - match_all!($($cls2::$tag2)&*)(_x) && match_all!($($cls1::$tag1)&*)(_y) + (match_all!($($func1)&*)(_x) && match_all!($($func2)&*)(_y)) || ( + match_all!($($func2)&*)(_x) && match_all!($($func1)&*)(_y) ) } }; @@ -290,9 +287,9 @@ macro_rules! swap_map { /// /// `Fn(x, y) -> Option` indicates the conflict result. macro_rules! is_conflict { - ($(($body:expr, $($cls1:ident :: $tag1:ident)&*, $($pat:tt)*)),*) => { + ($(($body:expr, $($func:ident)&*, $($pat:tt)*)),*) => { |_self, _other| match (_self, _other) { - $((x, y) if swap_match!($($cls1::$tag1)&*, $($pat)*)(x, y) => Some($body),)* + $((x, y) if swap_match!($($func)&*, $($pat)*)(x, y) => Some($body),)* _ => None, } }; @@ -307,37 +304,32 @@ impl ConflictCheck for Command { if cache_key.0 > cache_key.1 { cache_key = (cache_key.1, cache_key.0); } - if let Some(res) = CONFLICT_RULES_CACHE - .read() - .ok() - .and_then(|c| c.get(&cache_key).cloned()) - { + if let Some(res) = CONFLICT_RULES_CACHE.with_borrow(|x| x.get(&cache_key).cloned()) { return res; } let first_step = is_conflict!( // auth read request will not conflict with any request except the auth write request ( true, - RequestBackend::Auth & RequestRw::Read, - RequestBackend::Auth & RequestRw::Write + is_auth_backend & is_read_only, + is_auth_backend & is_write ), - (false, RequestBackend::Auth & RequestRw::Read, _), + (false, is_auth_backend & is_read_only, _), // any two requests that don't meet the above conditions will conflict with each other // because the auth write request will make all previous token invalid - (true, RequestBackend::Auth & RequestRw::Write, _), - (true, RequestBackend::Alarm, _), + (true, is_auth_backend & is_write, _), + (true, is_alarm_backend, _), // Lease leases request is conflict with Lease grant and revoke requests ( true, - RequestBackend::Lease & RequestRw::Read, - RequestBackend::Lease & RequestRw::Write + is_lease_backend & is_read_only, + is_lease_backend & is_write ), - (true, RequestType::Compaction, RequestType::Compaction) + (true, is_compaction, is_compaction) )(t_req, o_req); if let Some(first_step_res) = first_step { - if let Ok(mut cache) = CONFLICT_RULES_CACHE.write() { - cache.insert((t_req.into(), o_req.into()), first_step_res); - } + CONFLICT_RULES_CACHE + .with_borrow_mut(|x| x.insert((t_req.into(), o_req.into()), first_step_res)); } first_step .or_else(|| { @@ -554,7 +546,7 @@ impl CurpCommand for Command { #[inline] fn is_read_only(&self) -> bool { - match_all!(RequestRw::Read)(self.request()) + self.request().is_read_only() } } @@ -639,20 +631,11 @@ mod test { ..Default::default() })); let cache_key = ((&cmd1.request).into(), (&cmd2.request).into()); - if CONFLICT_RULES_CACHE - .read() - .unwrap() - .get(&cache_key) - .is_some() - { + if CONFLICT_RULES_CACHE.with_borrow(|x| x.get(&cache_key).is_some()) { return; } let _ig = ConflictCheck::is_conflict(&cmd1, &cmd2); - assert!(CONFLICT_RULES_CACHE - .read() - .unwrap() - .get(&cache_key) - .is_some()); + assert!(CONFLICT_RULES_CACHE.with_borrow(|x| x.get(&cache_key).is_some())); } #[test] diff --git a/crates/xlineapi/src/lib.rs b/crates/xlineapi/src/lib.rs index a44f54617..aa2f1ca4e 100644 --- a/crates/xlineapi/src/lib.rs +++ b/crates/xlineapi/src/lib.rs @@ -208,12 +208,12 @@ mod errorpb { use std::fmt::Display; +use classifier::RequestClassifier; use command::KeyRange; use utils::write_vec; pub use self::{ authpb::{permission::Type, Permission, Role, User, UserAddOptions}, - classifier::{RequestBackend, RequestRw}, commandpb::{ command::{AuthInfo, RequestWrapper}, command_response::ResponseWrapper, @@ -275,8 +275,6 @@ pub use self::{ }, }; -use crate::command::match_all; - impl User { /// Check if user has the given role pub fn has_role(&self, role: &str) -> bool { @@ -445,14 +443,9 @@ impl RequestWrapper { } } - /// Get the backend of the request - pub fn backend(&self) -> RequestBackend { - RequestBackend::from(self) - } - /// Check whether this auth request should skip the revision or not pub fn skip_auth_revision(&self) -> bool { - match_all!(RequestRw::Read)(self) + self.is_read_only() || matches!( *self, RequestWrapper::AuthEnableRequest(_) | RequestWrapper::AuthenticateRequest(_)