Skip to content

Commit

Permalink
refactor: simplify raw_curp::push_back
Browse files Browse the repository at this point in the history
Signed-off-by: GFX9 <[email protected]>
  • Loading branch information
GFX9 committed Apr 19, 2024
1 parent 26eef68 commit e0883d0
Showing 1 changed file with 23 additions and 71 deletions.
94 changes: 23 additions & 71 deletions crates/curp/src/server/raw_curp/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bincode::serialized_size;
use clippy_utilities::{NumericCast, OverflowArithmetic};
use itertools::Itertools;
use tokio::sync::mpsc;
use tracing::{error, trace, warn};
use tracing::{error, warn};

use crate::{
cmd::Command,
Expand Down Expand Up @@ -118,7 +118,6 @@ impl<C: Command> LogEntryVecDeque<C> {

/// push a log entry into the back of queue
fn push_back(&mut self, entry: Arc<LogEntry<C>>) -> Result<(), bincode::Error> {
#![allow(clippy::indexing_slicing)]
let entry_size = serialized_size(&entry)?;

if entry_size > self.batch_limit {
Expand All @@ -128,46 +127,26 @@ impl<C: Command> LogEntryVecDeque<C> {
self.entries.push_back(entry);
self.entry_size.push_back(entry_size);
self.batch_index.push_back(0); // placeholder

if entry_size > self.batch_limit {
let entry_idx = self.batch_index.len() - 1;
for prev_idx in self.first_entry_at_last_batch..entry_idx {
self.batch_index[prev_idx] = entry_idx - prev_idx; // record offset but not absolute index
}
self.batch_index[entry_idx] = 1;
self.last_batch_size = 0;
self.first_entry_at_last_batch = entry_idx + 1;
return Ok(());
}

while self.last_batch_size + entry_size > self.batch_limit
&& self.first_entry_at_last_batch < self.entries.len()
{
self.batch_index[self.first_entry_at_last_batch] =
self.entries.len() - 1 - self.first_entry_at_last_batch; // record offset but not absolute index
self.last_batch_size -= self.entry_size[self.first_entry_at_last_batch];
self.first_entry_at_last_batch += 1;
}

self.last_batch_size += entry_size;

if self.first_entry_at_last_batch >= self.entries.len() {
self.batch_index[self.entries.len() - 1] = 1;
}

if self.last_batch_size == self.batch_limit {
self.batch_index[self.first_entry_at_last_batch] =
self.entries.len() - self.first_entry_at_last_batch; // record offset but not absolute index
while self.last_batch_size > self.batch_limit {
if let Some(cur_batch_index) = self.batch_index.get_mut(self.first_entry_at_last_batch)
{
*cur_batch_index = self.entries.len() - 1 - self.first_entry_at_last_batch;
if let Some(cur_entry_size) = self.entry_size.get(self.first_entry_at_last_batch) {
self.last_batch_size -= *cur_entry_size;
}
self.first_entry_at_last_batch += 1;
}
}

Ok(())
}

/// pop a log entry from the front of queue
fn pop_front(&mut self) -> Option<Arc<LogEntry<C>>> {
#![allow(clippy::indexing_slicing)]
if self.entries.front().is_some() {
let front_size = self.entry_size[0];
let front_size = *self.entry_size.front().unwrap_or_else(|| unreachable!("The entry_size cannot be empty"));

if self.first_entry_at_last_batch == 0 {
self.last_batch_size -= front_size;
Expand Down Expand Up @@ -213,16 +192,13 @@ impl<C: Command> LogEntryVecDeque<C> {
}

/// Get the range [left, right) of the log entry, whose size should be equal or smaller than `batch_limit`
#[allow(clippy::range_plus_one)]
fn get_range_by_batch(&self, left: usize) -> Range<usize> {
#![allow(clippy::indexing_slicing)]
if left >= self.batch_index.len() {
trace!(
"left = {}, self.batch_index.len() = {}, self.entries.len() = {}",
left,
self.batch_index.len(),
self.entries.len()
);
left..self.entries.len()
} else if self.entry_size[left] > self.batch_limit {
left..left + 1
} else if self.batch_index[left] == 0 {
left..self.entries.len()
} else {
Expand All @@ -240,6 +216,9 @@ impl<C: Command> LogEntryVecDeque<C> {
fn has_next_batch(&self, left: usize) -> bool {
if let Some(&offset) = self.batch_index.get(left) {
offset != 0
|| (self.first_entry_at_last_batch == left
&& self.batch_limit == self.last_batch_size)
|| left == self.batch_index.len() - 1
} else {
false
}
Expand All @@ -252,41 +231,14 @@ impl<C: Command> LogEntryVecDeque<C> {
self.batch_limit = batch_limit;
self.last_batch_size = 0;
self.first_entry_at_last_batch = 0;
self.batch_index.iter_mut().for_each(|val| *val = 0);

for entry_idx in 0..self.entries.len() {
let entry_size = self.entry_size[entry_idx];

if entry_size > self.batch_limit {
for prev_idx in self.first_entry_at_last_batch..entry_idx {
self.batch_index[prev_idx] = entry_idx - prev_idx; // record offset but not absolute index
}
self.batch_index[entry_idx] = 1;
self.last_batch_size = 0;
self.first_entry_at_last_batch = entry_idx + 1;
continue;
}

while self.last_batch_size + entry_size > self.batch_limit
&& self.first_entry_at_last_batch < self.entries.len()
{
self.batch_index[self.first_entry_at_last_batch] =
entry_idx - self.first_entry_at_last_batch; // record offset but not absolute index
self.last_batch_size -= self.entry_size[self.first_entry_at_last_batch];
self.first_entry_at_last_batch += 1;
}

self.last_batch_size += entry_size;
self.batch_index.clear();

if self.first_entry_at_last_batch >= self.entries.len() {
self.batch_index[entry_idx] = 1;
}
let prev_entries = self.entries.clone();
self.entries.clear();

if entry_idx == self.entries.len() - 1 && self.last_batch_size == self.batch_limit {
self.batch_index[self.first_entry_at_last_batch] =
self.entries.len() - self.first_entry_at_last_batch; // record offset but not absolute index
}
}
let _unused = prev_entries.into_iter().for_each(|item| {
let _u = self.push_back(item);
});
}
}

Expand Down

0 comments on commit e0883d0

Please sign in to comment.