Skip to content

Commit

Permalink
refactor(xlineapi): use function instead of convertion; use thread_local
Browse files Browse the repository at this point in the history
Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Nov 4, 2024
1 parent 60343b5 commit a6d3449
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 203 deletions.
48 changes: 26 additions & 22 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ use parking_lot::RwLock;
use tracing::warn;
use utils::{barrier::IdBarrier, table_names::META_TABLE};
use xlineapi::{
classifier::RequestClassifier,
command::{Command, CurpClient, SyncResponse},
execute_error::ExecuteError,
AlarmAction, AlarmRequest, AlarmType,
};

use crate::{
revision_number::RevisionNumberGeneratorState,
rpc::{RequestBackend, RequestWrapper},
rpc::RequestWrapper,
storage::{
db::{WriteOp, DB},
index::IndexOperate,
Expand Down Expand Up @@ -315,22 +316,22 @@ impl CommandExecutor {
I: IndexOperate,
{
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
RequestBackend::Kv => unreachable!("Should not execute kv commands"),
.then(|| match wrapper {
x if x.is_auth_backend() => self.auth_storage.execute(wrapper),
x if x.is_lease_backend() => self.lease_storage.execute(wrapper),
x if x.is_alarm_backend() => Ok(self.alarm_storage.execute(wrapper)),
_ => unreachable!("Should not execute kv commands"),
})
.transpose()?;

let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?,
RequestBackend::Lease => {
let (asr, wr_ops) = match wrapper {
x if x.is_auth_backend() => self.auth_storage.after_sync(wrapper, auth_revision)?,
x if x.is_lease_backend() => {
self.lease_storage
.after_sync(wrapper, general_revision, txn_db, index)?
}
RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision),
RequestBackend::Kv => unreachable!("Should not sync kv commands"),
x if x.is_alarm_backend() => self.alarm_storage.after_sync(wrapper, general_revision),
_ => unreachable!("Should not sync kv commands"),
};

txn_db.write_ops(wr_ops)?;
Expand Down Expand Up @@ -421,11 +422,12 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
let auth_info = cmd.auth_info();
let wrapper = cmd.request();
self.auth_storage.check_permission(wrapper, auth_info)?;
match wrapper.backend() {
RequestBackend::Kv => self.kv_storage.execute(wrapper, None),
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
match &wrapper {
x if x.is_kv_backend() => self.kv_storage.execute(wrapper, None),
x if x.is_auth_backend() => self.auth_storage.execute(wrapper),
x if x.is_lease_backend() => self.lease_storage.execute(wrapper),
x if x.is_alarm_backend() => Ok(self.alarm_storage.execute(wrapper)),
_ => unreachable!("Must be one of kv, auth, lease, alarm"),
}
}

Expand All @@ -438,11 +440,12 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
> {
let er = self.execute(cmd)?;
let wrapper = cmd.request();
let rev = match wrapper.backend() {
RequestBackend::Kv | RequestBackend::Lease | RequestBackend::Alarm => {
let rev = match wrapper {
x if x.is_auth_backend() => self.auth_storage.revision_gen().get(),
x if (x.is_kv_backend() || x.is_lease_backend() || x.is_alarm_backend()) => {
self.kv_storage.revision_gen().get()
}
RequestBackend::Auth => self.auth_storage.revision_gen().get(),
_ => unreachable!("Must be one of kv, auth, lease, alarm"),
};
Ok((er, SyncResponse::new(rev)))
}
Expand Down Expand Up @@ -484,15 +487,15 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
states.update_result(|c| {
let (cmd, to_execute) = c.into_parts();
let wrapper = cmd.request();
let (asr, er) = match wrapper.backend() {
RequestBackend::Kv => self.after_sync_kv(
let (asr, er) = match wrapper {
x if x.is_kv_backend() => self.after_sync_kv(
wrapper,
&txn_db,
&index_state,
&general_revision_state,
to_execute,
),
RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => self
x if x.is_auth_backend() || x.is_lease_backend() || x.is_alarm_backend() => self
.after_sync_others(
wrapper,
&txn_db,
Expand All @@ -501,6 +504,7 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
&auth_revision_state,
to_execute,
),
_ => unreachable!("Must be one of kv, auth, lease, alarm"),
}?;

if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper {
Expand Down
214 changes: 92 additions & 122 deletions crates/xlineapi/src/classifier.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
use super::RequestWrapper;

pub trait RequestClassifier {
fn is_kv_backend(&self) -> bool;
fn is_lease_backend(&self) -> bool;
fn is_auth_backend(&self) -> bool;
fn is_alarm_backend(&self) -> bool;
fn is_put(&self) -> bool;
fn is_compaction(&self) -> bool;
fn is_txn(&self) -> bool;
fn is_range(&self) -> bool;
fn is_read_only(&self) -> bool;
fn is_write(&self) -> bool;
}

/// because RequestWrapper do not repr u8, we need to convert it manually.
impl From<&RequestWrapper> for u8 {
fn from(value: &RequestWrapper) -> Self {
Expand Down Expand Up @@ -34,138 +47,95 @@ impl From<&RequestWrapper> for u8 {
}
}

/// Backend store of request
#[allow(missing_docs)]
#[derive(Debug, PartialEq, Eq)]
pub enum RequestBackend {
Kv,
Auth,
Lease,
Alarm,
}

impl From<&RequestWrapper> for RequestBackend {
fn from(value: &RequestWrapper) -> Self {
match *value {
impl RequestClassifier for RequestWrapper {
#[inline]
fn is_kv_backend(&self) -> bool {
matches!(
*self,
RequestWrapper::PutRequest(_)
| RequestWrapper::RangeRequest(_)
| RequestWrapper::DeleteRangeRequest(_)
| RequestWrapper::TxnRequest(_)
| RequestWrapper::CompactionRequest(_) => RequestBackend::Kv,
RequestWrapper::AuthEnableRequest(_)
| RequestWrapper::AuthDisableRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleAddRequest(_)
| RequestWrapper::AuthRoleDeleteRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleGrantPermissionRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthRoleRevokePermissionRequest(_)
| RequestWrapper::AuthUserAddRequest(_)
| RequestWrapper::AuthUserChangePasswordRequest(_)
| RequestWrapper::AuthUserDeleteRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserGrantRoleRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::AuthUserRevokeRoleRequest(_)
| RequestWrapper::AuthenticateRequest(_) => RequestBackend::Auth,
RequestWrapper::LeaseGrantRequest(_)
| RequestWrapper::LeaseRevokeRequest(_)
| RequestWrapper::LeaseLeasesRequest(_) => RequestBackend::Lease,
RequestWrapper::AlarmRequest(_) => RequestBackend::Alarm,
}
| RequestWrapper::RangeRequest(_)
| RequestWrapper::DeleteRangeRequest(_)
| RequestWrapper::TxnRequest(_)
| RequestWrapper::CompactionRequest(_)
)
}
}

/// Type of request. This is the extending of [`RequestBackend`].
#[allow(missing_docs)]
#[derive(Debug, PartialEq, Eq)]
pub enum RequestType {
Put,
Compaction,
Txn,
Range,
Auth,
Lease,
Alarm,
}

impl From<&RequestWrapper> for RequestType {
fn from(value: &RequestWrapper) -> Self {
match *value {
RequestWrapper::PutRequest(_) => RequestType::Put,
RequestWrapper::RangeRequest(_) | RequestWrapper::DeleteRangeRequest(_) => {
RequestType::Range
}
RequestWrapper::TxnRequest(_) => RequestType::Txn,
RequestWrapper::CompactionRequest(_) => RequestType::Compaction,
#[inline]
fn is_auth_backend(&self) -> bool {
matches!(
self,
RequestWrapper::AuthEnableRequest(_)
| RequestWrapper::AuthDisableRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleAddRequest(_)
| RequestWrapper::AuthRoleDeleteRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleGrantPermissionRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthRoleRevokePermissionRequest(_)
| RequestWrapper::AuthUserAddRequest(_)
| RequestWrapper::AuthUserChangePasswordRequest(_)
| RequestWrapper::AuthUserDeleteRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserGrantRoleRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::AuthUserRevokeRoleRequest(_)
| RequestWrapper::AuthenticateRequest(_) => RequestType::Auth,
| RequestWrapper::AuthDisableRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleAddRequest(_)
| RequestWrapper::AuthRoleDeleteRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleGrantPermissionRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthRoleRevokePermissionRequest(_)
| RequestWrapper::AuthUserAddRequest(_)
| RequestWrapper::AuthUserChangePasswordRequest(_)
| RequestWrapper::AuthUserDeleteRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserGrantRoleRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::AuthUserRevokeRoleRequest(_)
| RequestWrapper::AuthenticateRequest(_)
)
}
#[inline]
fn is_lease_backend(&self) -> bool {
matches!(
self,
RequestWrapper::LeaseGrantRequest(_)
| RequestWrapper::LeaseRevokeRequest(_)
| RequestWrapper::LeaseLeasesRequest(_) => RequestType::Lease,
RequestWrapper::AlarmRequest(_) => RequestType::Alarm,
}
| RequestWrapper::LeaseRevokeRequest(_)
| RequestWrapper::LeaseLeasesRequest(_)
)
}
#[inline]
fn is_alarm_backend(&self) -> bool {
matches!(self, RequestWrapper::AlarmRequest(_))
}
#[inline]
fn is_put(&self) -> bool {
matches!(self, RequestWrapper::PutRequest(_))
}
#[inline]
fn is_range(&self) -> bool {
matches!(
self,
RequestWrapper::RangeRequest(_) | RequestWrapper::DeleteRangeRequest(_)
)
}
#[inline]
fn is_txn(&self) -> bool {
matches!(self, RequestWrapper::TxnRequest(_))
}
#[inline]
fn is_compaction(&self) -> bool {
matches!(self, RequestWrapper::CompactionRequest(_))
}
}

/// indicates if the request is readonly or write
#[derive(Debug, PartialEq, Eq)]
pub enum RequestRw {
#[inline]
/// Read only request
Read,
fn is_read_only(&self) -> bool {
matches!(
self,
RequestWrapper::RangeRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::LeaseLeasesRequest(_)
)
}

#[inline]
/// Write request.
///
/// NOTE: A `TxnRequest` or a `DeleteRangeRequest` might be read-only, but we
/// assume they will mutate the state machine to simplify the implementation.
Write,
}

impl From<&RequestWrapper> for RequestRw {
fn from(value: &RequestWrapper) -> Self {
match *value {
RequestWrapper::RangeRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::LeaseLeasesRequest(_) => Self::Read,

RequestWrapper::PutRequest(_)
| RequestWrapper::DeleteRangeRequest(_)
| RequestWrapper::TxnRequest(_)
| RequestWrapper::CompactionRequest(_)
| RequestWrapper::AuthEnableRequest(_)
| RequestWrapper::AuthDisableRequest(_)
| RequestWrapper::AuthRoleAddRequest(_)
| RequestWrapper::AuthRoleDeleteRequest(_)
| RequestWrapper::AuthRoleGrantPermissionRequest(_)
| RequestWrapper::AuthRoleRevokePermissionRequest(_)
| RequestWrapper::AuthUserAddRequest(_)
| RequestWrapper::AuthUserChangePasswordRequest(_)
| RequestWrapper::AuthUserDeleteRequest(_)
| RequestWrapper::AuthUserGrantRoleRequest(_)
| RequestWrapper::AuthUserRevokeRoleRequest(_)
| RequestWrapper::AuthenticateRequest(_)
| RequestWrapper::LeaseGrantRequest(_)
| RequestWrapper::LeaseRevokeRequest(_)
| RequestWrapper::AlarmRequest(_) => Self::Write,
}
fn is_write(&self) -> bool {
!self.is_read_only()
}
}
Loading

0 comments on commit a6d3449

Please sign in to comment.