Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(xlineapi): improve is_conflict #932

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ use parking_lot::RwLock;
use tracing::warn;
use utils::{barrier::IdBarrier, table_names::META_TABLE};
use xlineapi::{
classifier::RequestClassifier,
command::{Command, CurpClient, SyncResponse},
execute_error::ExecuteError,
AlarmAction, AlarmRequest, AlarmType,
};

use crate::{
revision_number::RevisionNumberGeneratorState,
rpc::{RequestBackend, RequestWrapper},
rpc::RequestWrapper,
storage::{
db::{WriteOp, DB},
index::IndexOperate,
Expand Down Expand Up @@ -315,22 +316,22 @@ impl CommandExecutor {
I: IndexOperate,
{
let er = to_execute
.then(|| match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
RequestBackend::Kv => unreachable!("Should not execute kv commands"),
.then(|| match wrapper {
x if x.is_auth_backend() => self.auth_storage.execute(wrapper),
x if x.is_lease_backend() => self.lease_storage.execute(wrapper),
x if x.is_alarm_backend() => Ok(self.alarm_storage.execute(wrapper)),
_ => unreachable!("Should not execute kv commands"),
})
.transpose()?;

let (asr, wr_ops) = match wrapper.backend() {
RequestBackend::Auth => self.auth_storage.after_sync(wrapper, auth_revision)?,
RequestBackend::Lease => {
let (asr, wr_ops) = match wrapper {
x if x.is_auth_backend() => self.auth_storage.after_sync(wrapper, auth_revision)?,
x if x.is_lease_backend() => {
self.lease_storage
.after_sync(wrapper, general_revision, txn_db, index)?
}
RequestBackend::Alarm => self.alarm_storage.after_sync(wrapper, general_revision),
RequestBackend::Kv => unreachable!("Should not sync kv commands"),
x if x.is_alarm_backend() => self.alarm_storage.after_sync(wrapper, general_revision),
_ => unreachable!("Should not sync kv commands"),
};

txn_db.write_ops(wr_ops)?;
Expand Down Expand Up @@ -421,11 +422,12 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
let auth_info = cmd.auth_info();
let wrapper = cmd.request();
self.auth_storage.check_permission(wrapper, auth_info)?;
match wrapper.backend() {
RequestBackend::Kv => self.kv_storage.execute(wrapper, None),
RequestBackend::Auth => self.auth_storage.execute(wrapper),
RequestBackend::Lease => self.lease_storage.execute(wrapper),
RequestBackend::Alarm => Ok(self.alarm_storage.execute(wrapper)),
match &wrapper {
x if x.is_kv_backend() => self.kv_storage.execute(wrapper, None),
x if x.is_auth_backend() => self.auth_storage.execute(wrapper),
x if x.is_lease_backend() => self.lease_storage.execute(wrapper),
x if x.is_alarm_backend() => Ok(self.alarm_storage.execute(wrapper)),
_ => unreachable!("Must be one of kv, auth, lease, alarm"),
}
}

Expand All @@ -438,11 +440,12 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
> {
let er = self.execute(cmd)?;
let wrapper = cmd.request();
let rev = match wrapper.backend() {
RequestBackend::Kv | RequestBackend::Lease | RequestBackend::Alarm => {
let rev = match wrapper {
x if x.is_auth_backend() => self.auth_storage.revision_gen().get(),
x if (x.is_kv_backend() || x.is_lease_backend() || x.is_alarm_backend()) => {
self.kv_storage.revision_gen().get()
}
RequestBackend::Auth => self.auth_storage.revision_gen().get(),
_ => unreachable!("Must be one of kv, auth, lease, alarm"),
};
Ok((er, SyncResponse::new(rev)))
}
Expand Down Expand Up @@ -484,15 +487,15 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
states.update_result(|c| {
let (cmd, to_execute) = c.into_parts();
let wrapper = cmd.request();
let (asr, er) = match wrapper.backend() {
RequestBackend::Kv => self.after_sync_kv(
let (asr, er) = match wrapper {
x if x.is_kv_backend() => self.after_sync_kv(
wrapper,
&txn_db,
&index_state,
&general_revision_state,
to_execute,
),
RequestBackend::Auth | RequestBackend::Lease | RequestBackend::Alarm => self
x if x.is_auth_backend() || x.is_lease_backend() || x.is_alarm_backend() => self
.after_sync_others(
wrapper,
&txn_db,
Expand All @@ -501,6 +504,7 @@ impl CurpCommandExecutor<Command> for CommandExecutor {
&auth_revision_state,
to_execute,
),
_ => unreachable!("Must be one of kv, auth, lease, alarm"),
}?;

if let RequestWrapper::CompactionRequest(ref compact_req) = *wrapper {
Expand Down
141 changes: 141 additions & 0 deletions crates/xlineapi/src/classifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use super::RequestWrapper;

pub trait RequestClassifier {
fn is_kv_backend(&self) -> bool;
fn is_lease_backend(&self) -> bool;
fn is_auth_backend(&self) -> bool;
fn is_alarm_backend(&self) -> bool;
fn is_put(&self) -> bool;
fn is_compaction(&self) -> bool;
fn is_txn(&self) -> bool;
fn is_range(&self) -> bool;
fn is_read_only(&self) -> bool;
fn is_write(&self) -> bool;
}

/// because RequestWrapper do not repr u8, we need to convert it manually.
impl From<&RequestWrapper> for u8 {
fn from(value: &RequestWrapper) -> Self {
match *value {
RequestWrapper::PutRequest(_) => 0,
RequestWrapper::RangeRequest(_) => 1,
RequestWrapper::DeleteRangeRequest(_) => 2,
RequestWrapper::TxnRequest(_) => 3,
RequestWrapper::CompactionRequest(_) => 4,
RequestWrapper::AuthEnableRequest(_) => 5,
RequestWrapper::AuthDisableRequest(_) => 6,
RequestWrapper::AuthStatusRequest(_) => 7,
RequestWrapper::AuthRoleAddRequest(_) => 8,
RequestWrapper::AuthRoleDeleteRequest(_) => 9,
RequestWrapper::AuthRoleGetRequest(_) => 10,
RequestWrapper::AuthRoleGrantPermissionRequest(_) => 11,
RequestWrapper::AuthRoleListRequest(_) => 12,
RequestWrapper::AuthRoleRevokePermissionRequest(_) => 13,
RequestWrapper::AuthUserAddRequest(_) => 14,
RequestWrapper::AuthUserChangePasswordRequest(_) => 15,
RequestWrapper::AuthUserDeleteRequest(_) => 16,
RequestWrapper::AuthUserGetRequest(_) => 17,
RequestWrapper::AuthUserGrantRoleRequest(_) => 18,
RequestWrapper::AuthUserListRequest(_) => 19,
RequestWrapper::AuthUserRevokeRoleRequest(_) => 20,
RequestWrapper::AuthenticateRequest(_) => 21,
RequestWrapper::LeaseGrantRequest(_) => 22,
RequestWrapper::LeaseRevokeRequest(_) => 23,
RequestWrapper::LeaseLeasesRequest(_) => 24,
RequestWrapper::AlarmRequest(_) => 25,
}
}
}
bsbds marked this conversation as resolved.
Show resolved Hide resolved

impl RequestClassifier for RequestWrapper {
#[inline]
fn is_kv_backend(&self) -> bool {
matches!(
*self,
RequestWrapper::PutRequest(_)
| RequestWrapper::RangeRequest(_)
| RequestWrapper::DeleteRangeRequest(_)
| RequestWrapper::TxnRequest(_)
| RequestWrapper::CompactionRequest(_)
)
}
#[inline]
fn is_auth_backend(&self) -> bool {
matches!(
self,
RequestWrapper::AuthEnableRequest(_)
| RequestWrapper::AuthDisableRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleAddRequest(_)
| RequestWrapper::AuthRoleDeleteRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleGrantPermissionRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthRoleRevokePermissionRequest(_)
| RequestWrapper::AuthUserAddRequest(_)
| RequestWrapper::AuthUserChangePasswordRequest(_)
| RequestWrapper::AuthUserDeleteRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserGrantRoleRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::AuthUserRevokeRoleRequest(_)
| RequestWrapper::AuthenticateRequest(_)
)
}
#[inline]
fn is_lease_backend(&self) -> bool {
matches!(
self,
RequestWrapper::LeaseGrantRequest(_)
| RequestWrapper::LeaseRevokeRequest(_)
| RequestWrapper::LeaseLeasesRequest(_)
)
}
#[inline]
fn is_alarm_backend(&self) -> bool {
matches!(self, RequestWrapper::AlarmRequest(_))
}
#[inline]
fn is_put(&self) -> bool {
matches!(self, RequestWrapper::PutRequest(_))
}
#[inline]
fn is_range(&self) -> bool {
matches!(
self,
RequestWrapper::RangeRequest(_) | RequestWrapper::DeleteRangeRequest(_)
)
}
#[inline]
fn is_txn(&self) -> bool {
matches!(self, RequestWrapper::TxnRequest(_))
}
#[inline]
fn is_compaction(&self) -> bool {
matches!(self, RequestWrapper::CompactionRequest(_))
}

#[inline]
/// Read only request
fn is_read_only(&self) -> bool {
matches!(
self,
RequestWrapper::RangeRequest(_)
| RequestWrapper::AuthStatusRequest(_)
| RequestWrapper::AuthRoleGetRequest(_)
| RequestWrapper::AuthRoleListRequest(_)
| RequestWrapper::AuthUserGetRequest(_)
| RequestWrapper::AuthUserListRequest(_)
| RequestWrapper::LeaseLeasesRequest(_)
)
}

#[inline]
/// Write request.
///
/// NOTE: A `TxnRequest` or a `DeleteRangeRequest` might be read-only, but we
/// assume they will mutate the state machine to simplify the implementation.
fn is_write(&self) -> bool {
!self.is_read_only()
}
}
Loading
Loading