Skip to content

Commit

Permalink
fix(wal): xline-kv#854
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jul 12, 2024
1 parent bc33a36 commit 15cdc15
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 50 deletions.
29 changes: 18 additions & 11 deletions crates/curp/src/server/storage/wal/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ where

/// Encodes a frame
fn encode(&mut self, frames: Vec<DataFrame<'_, C>>) -> Result<Vec<u8>, Self::Error> {
let mut frame_data: Vec<_> = frames.into_iter().flat_map(|f| f.encode()).collect();
let mut frame_data = Vec::new();
for frame in frames {
frame_data.extend_from_slice(&frame.encode());
}
let commit_frame = CommitFrame::new_from_data(&frame_data);
frame_data.extend_from_slice(&commit_frame.encode());

Expand Down Expand Up @@ -285,17 +288,18 @@ where
.unwrap_or_else(|_| unreachable!("serialization should never fail"));
let len = entry_bytes.len();
assert_eq!(len >> 56, 0, "log entry length: {len} too large");
let len_bytes = len.to_le_bytes().into_iter().take(7);
let header = std::iter::once(self.frame_type()).chain(len_bytes);
header.chain(entry_bytes).collect()
let mut bytes = Vec::with_capacity(1 + 7 + entry_bytes.len());
bytes.push(self.frame_type());
bytes.extend_from_slice(&len.to_le_bytes()[..7]);
bytes.extend_from_slice(&entry_bytes);
bytes
}
DataFrame::SealIndex(index) => {
assert_eq!(index >> 56, 0, "log index: {index} too large");
// use the first 7 bytes
let index_bytes = index.to_le_bytes().into_iter().take(7);
std::iter::once(self.frame_type())
.chain(index_bytes)
.collect()
let mut bytes = index.to_le_bytes();
bytes.rotate_right(1);
bytes[0] = self.frame_type();
bytes.to_vec()
}
}
}
Expand Down Expand Up @@ -327,8 +331,11 @@ impl FrameEncoder for CommitFrame {
clippy::indexing_slicing // The slicing is checked
)]
fn encode(&self) -> Vec<u8> {
let header = std::iter::once(self.frame_type()).chain([0u8; 7]);
header.chain(self.checksum.clone()).collect()
let mut bytes = Vec::with_capacity(8 + self.checksum.len());
bytes.extend_from_slice(&[0; 8]);
bytes[0] = self.frame_type();
bytes.extend_from_slice(&self.checksum);
bytes
}
}

Expand Down
7 changes: 3 additions & 4 deletions crates/curp/src/server/storage/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ where
/// Send frames with fsync
#[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy
pub(super) fn send_sync(&mut self, item: Vec<DataFrame<'_, C>>) -> io::Result<()> {
let last_segment = self
.segments
.last_mut()
.unwrap_or_else(|| unreachable!("there should be at least on segment"));
let Some(last_segment) = self.segments.last_mut() else {
return Ok(());
};
if let Some(DataFrame::Entry(entry)) = item.last() {
self.next_log_index = entry.index.overflow_add(1);
}
Expand Down
42 changes: 16 additions & 26 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub(super) struct FilePipeline {
stopped: Arc<AtomicBool>,
/// Join handle of the allocation task
file_alloc_task_handle: Option<JoinHandle<()>>,
#[cfg_attr(not(madsim), allow(unused))]
/// File count used in madsim tests
file_count: usize,
}

impl FilePipeline {
Expand All @@ -43,13 +46,13 @@ impl FilePipeline {
error!("Failed to clean up tmp files: {e}");
}

let (file_tx, file_rx) = flume::bounded(1);
let dir_c = dir.clone();
let stopped = Arc::new(AtomicBool::new(false));
let stopped_c = Arc::clone(&stopped);

#[cfg(not(madsim))]
{
let (file_tx, file_rx) = flume::bounded(1);
let file_alloc_task_handle = std::thread::spawn(move || {
let mut file_count = 0;
loop {
Expand Down Expand Up @@ -80,41 +83,19 @@ impl FilePipeline {
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: Some(file_alloc_task_handle),
file_count: 0,
}
}

#[cfg(madsim)]
{
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
}
});

Self {
dir,
file_size,
file_iter: Some(file_rx.into_iter()),
file_iter: None,
stopped,
file_alloc_task_handle: None,
file_count: 0,
}
}
}
Expand Down Expand Up @@ -161,6 +142,7 @@ impl Drop for FilePipeline {
impl Iterator for FilePipeline {
type Item = io::Result<LockedFile>;

#[cfg(not(madsim))]
fn next(&mut self) -> Option<Self::Item> {
if self.stopped.load(Ordering::Relaxed) {
return None;
Expand All @@ -171,6 +153,14 @@ impl Iterator for FilePipeline {
.next()
.map(Ok)
}

#[cfg(madsim)]
fn next(&mut self) -> Option<Self::Item> {
if self.stopped.load(Ordering::Relaxed) {
return None;
}
Some(Self::alloc(&self.dir, self.file_size, &mut self.file_count))
}
}

impl std::fmt::Debug for FilePipeline {
Expand Down
25 changes: 16 additions & 9 deletions crates/curp/src/server/storage/wal/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,17 @@ impl WALSegment {
&mut self,
) -> Result<impl Iterator<Item = LogEntry<C>>, WALError>
where
C: Serialize + DeserializeOwned + 'static,
C: Serialize + DeserializeOwned + 'static + std::fmt::Debug,
{
let frame_batches = self.read_all(WAL::<C>::new())?;
let frame_batches_filtered: Vec<_> = frame_batches
.into_iter()
.filter(|b| !b.is_empty())
.collect();
// The highest_index of this segment
let mut highest_index = u64::MAX;
// We get the last frame batch to check it's type
if let Some(frames) = frame_batches.last() {
if let Some(frames) = frame_batches_filtered.last() {
let frame = frames
.last()
.unwrap_or_else(|| unreachable!("a batch should contains at least one frame"));
Expand All @@ -115,13 +119,16 @@ impl WALSegment {
self.update_seal_index(highest_index);

// Get log entries that index is no larger than `highest_index`
Ok(frame_batches.into_iter().flatten().filter_map(move |f| {
if let DataFrameOwned::Entry(e) = f {
(e.index <= highest_index).then_some(e)
} else {
None
}
}))
Ok(frame_batches_filtered
.into_iter()
.flatten()
.filter_map(move |f| {
if let DataFrameOwned::Entry(e) = f {
(e.index <= highest_index).then_some(e)
} else {
None
}
}))
}

/// Seal the current segment
Expand Down

0 comments on commit 15cdc15

Please sign in to comment.