From ca76a21288516bf18cad1ef8676e5cc4acf4ba9c Mon Sep 17 00:00:00 2001 From: BuildKite Date: Thu, 5 Dec 2024 02:32:31 +0100 Subject: [PATCH] Use 2 separate mmaps for metadata and posting lists --- rs/index/src/ivf/builder.rs | 2 - rs/index/src/ivf/index.rs | 3 +- rs/index/src/posting_list/combined_file.rs | 10 +- rs/index/src/posting_list/file.rs | 440 +++++++++++---------- rs/index/src/posting_list/fixed_file.rs | 6 +- 5 files changed, 236 insertions(+), 225 deletions(-) diff --git a/rs/index/src/ivf/builder.rs b/rs/index/src/ivf/builder.rs index 7738fc5a..ea566dc3 100644 --- a/rs/index/src/ivf/builder.rs +++ b/rs/index/src/ivf/builder.rs @@ -97,7 +97,6 @@ impl IvfBuilder { posting_lists_path, config.memory_size, config.file_size, - config.num_clusters, )); Ok(Self { @@ -218,7 +217,6 @@ impl IvfBuilder { posting_list_storage_location, self.config.memory_size, self.config.file_size, - self.centroids.len(), )); // Move ownership of each posting list to the posting list storage diff --git a/rs/index/src/ivf/index.rs b/rs/index/src/ivf/index.rs index 6e7fec7f..bf24b3ec 100644 --- a/rs/index/src/ivf/index.rs +++ b/rs/index/src/ivf/index.rs @@ -238,7 +238,8 @@ mod tests { // Write posting lists assert!(file.write_all(&(num_clusters as u64).to_le_bytes()).is_ok()); offset += size_of::(); - let mut pl_offset = num_clusters * 2 * size_of::(); + // Posting list offset starts at 0 (see FileBackedAppendablePostingListStorage) + let mut pl_offset = 0; for posting_list in posting_lists.iter() { let pl_len = posting_list.len(); assert!(file.write_all(&(pl_len as u64).to_le_bytes()).is_ok()); diff --git a/rs/index/src/posting_list/combined_file.rs b/rs/index/src/posting_list/combined_file.rs index 1e901d86..1c3c1795 100644 --- a/rs/index/src/posting_list/combined_file.rs +++ b/rs/index/src/posting_list/combined_file.rs @@ -135,13 +135,16 @@ impl FixedIndexFile { let metadata_offset = self.posting_list_metadata_offset + index * PL_METADATA_LEN * size_of::(); + + let posting_list_start_offset = self.posting_list_metadata_offset + + self.header.num_clusters as usize * PL_METADATA_LEN * size_of::(); + let slice = &self.mmap[metadata_offset..metadata_offset + size_of::()]; let pl_len = u64::from_le_bytes(slice.try_into()?) as usize; let slice = &self.mmap[metadata_offset + size_of::() ..metadata_offset + PL_METADATA_LEN * size_of::()]; - let pl_offset = - u64::from_le_bytes(slice.try_into()?) as usize + self.posting_list_metadata_offset; + let pl_offset = u64::from_le_bytes(slice.try_into()?) as usize + posting_list_start_offset; let slice = &self.mmap[pl_offset..pl_offset + pl_len * size_of::()]; Ok(transmute_u8_to_slice::(slice)) @@ -208,7 +211,8 @@ mod tests { // No need for padding here let posting_lists: Vec> = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8, 9, 10]]; - let metadata: Vec = vec![4, 32, 6, 64]; + // Posting list offset starts at 0 (see FileBackedAppendablePostingListStorage) + let metadata: Vec = vec![4, 0, 6, 32]; assert!(file.write_all(&num_clusters).is_ok()); assert!(file.write_all(transmute_slice_to_u8(&metadata)).is_ok()); assert!(file diff --git a/rs/index/src/posting_list/file.rs b/rs/index/src/posting_list/file.rs index f25576aa..711d6c34 100644 --- a/rs/index/src/posting_list/file.rs +++ b/rs/index/src/posting_list/file.rs @@ -1,5 +1,6 @@ use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; +use std::mem::size_of; use std::vec; use anyhow::{anyhow, Result}; @@ -19,10 +20,8 @@ struct FileAccessInfo { pub struct FileBackedAppendablePostingListStorage { pub memory_threshold: usize, pub backing_file_size: usize, - // Number of clusters in the IVF. Each cluster should have one posting list. - num_clusters: usize, // Counter of appended posting lists. - current_num_of_posting_list: usize, + entry_count: usize, // Number of bytes required to store the data. // - If it's in memory, the data stored are the posting lists // - If it's on disk, the data stored are the posting lists + metadata @@ -32,23 +31,26 @@ pub struct FileBackedAppendablePostingListStorage { resident_posting_lists: Vec>, resident: bool, - // Only has value if we spill to disk + // Only has value if we spill to disk. We need to separate metadata and posting lists + // into 2 different files because num_clusters is not known beforehand. base_directory: String, + // Backing files for posting list metadata + metadata_mmaps: Vec, + metadata_current_overall_offset: usize, + // Backing files for posting lists themselves mmaps: Vec, - current_backing_id: i32, - current_offset: usize, - offset_to_current_posting_list: u64, + current_overall_offset: usize, + // We currently use overall offset since it would be directly usable by queries to find + // the appropriate posting list. We could also use (backing_id, offset_in_file) pair to + // track current positions, but we'll need to convert that back to overall offset on + // storage's `write`. + // TODO(tyb): maybe consider using (backing_id, offset_in_file) pair for consistency with + // FileBackedAppendableVectorStorage. Or switch to overall offset for both. } impl FileBackedAppendablePostingListStorage { - pub fn new( - base_directory: String, - memory_threshold: usize, - backing_file_size: usize, - num_clusters: usize, - ) -> Self { - let pl_metadata_in_bytes = PL_METADATA_LEN * std::mem::size_of::(); - let offset_to_current_posting_list = (num_clusters * pl_metadata_in_bytes) as u64; + pub fn new(base_directory: String, memory_threshold: usize, backing_file_size: usize) -> Self { + let pl_metadata_in_bytes = PL_METADATA_LEN * size_of::(); // Rounding to PL_METADATA_LEN * `u64` size in bytes to at least simplify the reading of // posting list offsets and lengths. // @@ -57,28 +59,23 @@ impl FileBackedAppendablePostingListStorage { let rounded_backing_file_size = backing_file_size / pl_metadata_in_bytes * pl_metadata_in_bytes; Self { - base_directory, memory_threshold, backing_file_size: rounded_backing_file_size, - num_clusters, - current_num_of_posting_list: 0, + entry_count: 0, + size_bytes: 0, resident_posting_lists: vec![], resident: true, - size_bytes: 0, + + base_directory, + metadata_mmaps: vec![], + metadata_current_overall_offset: 0, mmaps: vec![], - current_backing_id: -1, - current_offset: 0, - offset_to_current_posting_list, + current_overall_offset: 0, } } pub fn new_with_config(base_directory: String, config: PostingListStorageConfig) -> Self { - Self::new( - base_directory, - config.memory_threshold, - config.file_size, - config.num_clusters, - ) + Self::new(base_directory, config.memory_threshold, config.file_size) } pub fn is_resident(&self) -> bool { @@ -88,6 +85,9 @@ impl FileBackedAppendablePostingListStorage { pub fn flush(&mut self) -> Result<()> { if !self.resident { // Flush all mmaps + for mmap in self.metadata_mmaps.iter_mut() { + mmap.flush()?; + } for mmap in self.mmaps.iter_mut() { mmap.flush()?; } @@ -95,80 +95,83 @@ impl FileBackedAppendablePostingListStorage { Ok(()) } - fn new_backing_file(&mut self) -> Result<()> { - self.current_backing_id += 1; + fn new_backing_file(&mut self, is_metadata: bool) -> Result<()> { + let (file_type, mmaps) = if is_metadata { + ("metadata", &mut self.metadata_mmaps) + } else { + ("posting_list", &mut self.mmaps) + }; + let backing_file_size = self.backing_file_size as u64; + let base_directory = self.base_directory.clone(); + + let file_name = format!("{}/{}.bin.{}", base_directory, file_type, mmaps.len()); + let backing_file = OpenOptions::new() .read(true) .write(true) .create(true) - .open(format!( - "{}/posting_list.bin.{}", - self.base_directory, self.current_backing_id - ))?; + .open(file_name)?; - backing_file.set_len(self.backing_file_size as u64)?; + backing_file.set_len(backing_file_size)?; + + mmaps.push(unsafe { memmap2::MmapMut::map_mut(&backing_file)? }); - self.mmaps - .push(unsafe { memmap2::MmapMut::map_mut(&backing_file)? }); - self.current_offset = 0; Ok(()) } - fn write_to_current_mmap(&mut self, data: &[u8]) -> Result<()> { - let mmap = &mut self.mmaps[self.current_backing_id as usize]; + fn write_to_current_mmap(&mut self, data: &[u8], is_metadata: bool) -> Result<()> { + let (offset, mmaps) = if is_metadata { + ( + &self.metadata_current_overall_offset, + &mut self.metadata_mmaps, + ) + } else { + (&self.current_overall_offset, &mut self.mmaps) + }; + + let file_access_info = + Self::offset_to_file_access_info(*offset, self.backing_file_size, mmaps)?; + + let mmap = &mut mmaps[file_access_info.file_num]; let write_size = data.len(); - mmap[self.current_offset..self.current_offset + write_size].copy_from_slice(data); - self.current_offset += write_size; + mmap[file_access_info.file_offset..file_access_info.file_offset + write_size] + .copy_from_slice(data); + if is_metadata { + self.metadata_current_overall_offset += write_size; + } else { + self.current_overall_offset += write_size; + } Ok(()) } - fn get_posting_list_offset(&self) -> usize { - let pl_metadata_in_bytes = PL_METADATA_LEN * std::mem::size_of::(); - self.num_clusters * pl_metadata_in_bytes - } - fn flush_resident_posting_lists_to_disk(&mut self) -> Result<()> { if !self.resident { return Err(anyhow!( "Posting lists should still be in memory when flushing to disk" )); } - // Trigger the creation of a new file when we start to flush - self.current_offset = self.backing_file_size; - let size_in_bytes = std::mem::size_of::(); + // Extract all the data we need from self to avoid immutable borrow issue let posting_lists: Vec> = std::mem::take(&mut self.resident_posting_lists); - // First write the metadata + for posting_list in &posting_lists { - if self.current_offset == self.backing_file_size { - self.new_backing_file()?; + // First write the metadata + if self.metadata_current_overall_offset % self.backing_file_size == 0 { + self.new_backing_file(true)?; } // Write the length of the posting list - self.write_to_current_mmap(&posting_list.len().to_le_bytes())?; - + self.write_to_current_mmap(&posting_list.len().to_le_bytes(), true)?; // Write the offset to the current posting list - self.write_to_current_mmap(&self.offset_to_current_posting_list.to_le_bytes())?; - self.offset_to_current_posting_list += (posting_list.len() * size_in_bytes) as u64; - } - // Now write the posting lists - let metadata_tot_size = self.get_posting_list_offset(); - let file_num_for_first_posting_list = metadata_tot_size / self.backing_file_size; - // file_num_for_first_posting_list is an index, while self.mmaps.len() is the length - while self.mmaps.len() - 1 < file_num_for_first_posting_list { - self.new_backing_file()?; - } - let file_access_info = self.offset_to_file_access_info(self.get_posting_list_offset())?; - self.current_offset = file_access_info.file_offset; - self.current_backing_id = file_access_info.file_num as i32; + self.write_to_current_mmap(&self.current_overall_offset.to_le_bytes(), true)?; - for posting_list in &posting_lists { + // Now write the posting lists for idx in posting_list { - if self.current_offset == self.backing_file_size { - self.new_backing_file()?; + if self.current_overall_offset % self.backing_file_size == 0 { + self.new_backing_file(false)?; } - self.write_to_current_mmap(&idx.to_le_bytes())?; + self.write_to_current_mmap(&idx.to_le_bytes(), false)?; } } @@ -177,60 +180,45 @@ impl FileBackedAppendablePostingListStorage { Ok(()) } - // The caller is responsible for setting self.current_offset to the right position - // for writing metadata. + // The caller is responsible for setting self.resident to false. fn append_posting_list_to_disk(&mut self, posting_list: &[u64]) -> Result<()> { if self.resident { return Err(anyhow!("Posting lists should already be flushed to disk")); } // First write the metadata - if self.current_offset == self.backing_file_size { - self.new_backing_file()?; + if self.metadata_current_overall_offset % self.backing_file_size == 0 { + self.new_backing_file(true)?; } // Write the length of the posting list - self.write_to_current_mmap(&posting_list.len().to_le_bytes())?; + self.write_to_current_mmap(&posting_list.len().to_le_bytes(), true)?; // Write the offset to the current posting list - self.write_to_current_mmap(&self.offset_to_current_posting_list.to_le_bytes())?; + self.write_to_current_mmap(&self.current_overall_offset.to_le_bytes(), true)?; // Now write the posting list - let size_in_bytes = std::mem::size_of::(); - let metadata_tot_size = self.get_posting_list_offset(); - let file_num_for_first_posting_list = metadata_tot_size / self.backing_file_size; - // file_num_for_first_posting_list is an index, while self.mmaps.len() is the length - while self.mmaps.len() - 1 < file_num_for_first_posting_list { - self.new_backing_file()?; - } - let file_access_info = - self.offset_to_file_access_info(self.offset_to_current_posting_list as usize)?; - self.current_offset = file_access_info.file_offset; - self.current_backing_id = file_access_info.file_num as i32; - - self.offset_to_current_posting_list += (posting_list.len() * size_in_bytes) as u64; - for idx in posting_list.iter() { - if self.current_offset == self.backing_file_size { - self.new_backing_file()?; + if self.current_overall_offset % self.backing_file_size == 0 { + self.new_backing_file(false)?; } - self.write_to_current_mmap(&idx.to_le_bytes())?; - } - - if self.current_offset == self.backing_file_size { - self.new_backing_file()?; + self.write_to_current_mmap(&idx.to_le_bytes(), false)?; } - self.current_num_of_posting_list += 1; + self.entry_count += 1; Ok(()) } - fn offset_to_file_access_info(&self, offset: usize) -> Result { - let file_num = offset / self.backing_file_size; - if file_num >= self.mmaps.len() { + fn offset_to_file_access_info( + offset: usize, + file_size: usize, + mmaps: &[memmap2::MmapMut], + ) -> Result { + let file_num = offset / file_size; + if file_num >= mmaps.len() { return Err(anyhow!("File number out of bound")); } - let file_offset = offset % self.backing_file_size; - if file_offset >= self.mmaps[file_num].len() { + let file_offset = offset % file_size; + if file_offset >= mmaps[file_num].len() { return Err(anyhow!("File offset out of bound")); } @@ -239,6 +227,51 @@ impl FileBackedAppendablePostingListStorage { file_offset, }) } + + fn get_slice( + mmaps: &[memmap2::MmapMut], + start: usize, + len: usize, + file_size: usize, + ) -> Result<&[u8]> { + let file_access_info = Self::offset_to_file_access_info(start, file_size, mmaps)?; + let (file_offset, file_num) = (file_access_info.file_offset, file_access_info.file_num); + let mmap = &mmaps[file_num]; + Ok(&mmap[file_offset..file_offset + len]) + } + + fn write_mmaps( + writer: &mut BufWriter<&mut File>, + mmaps: &[memmap2::MmapMut], + offset: usize, + backing_file_size: usize, + ) -> Result { + let file_access_info = Self::offset_to_file_access_info(offset, backing_file_size, mmaps)?; + let file_offset = file_access_info.file_offset; + let file_num = file_access_info.file_num; + + let mut total_bytes_written = 0; + for (i, mmap) in mmaps.iter().enumerate() { + let bytes_to_write = if i == file_num { + file_offset as usize + } else { + mmap.len() + }; + + let bytes_written = wrap_write(writer, &mmap[..bytes_to_write])?; + total_bytes_written += bytes_written; + + if bytes_written != bytes_to_write { + return Err(anyhow!( + "Failed to write entire mmap: expected {} bytes, wrote {} bytes", + bytes_to_write, + bytes_written + )); + } + } + + Ok(total_bytes_written) + } } impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { @@ -254,30 +287,33 @@ impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { )])); } - if i >= self.current_num_of_posting_list { + if i >= self.entry_count { return Err(anyhow!("Posting list id out of bound")); } - let size_in_bytes = std::mem::size_of::(); - let offset_to_pl_metadata = i * PL_METADATA_LEN * size_in_bytes; + let u64_bytes = size_of::(); + let metadata_len = PL_METADATA_LEN * u64_bytes; + let metadata_offset = i * metadata_len; - let file_access_info = self.offset_to_file_access_info(offset_to_pl_metadata)?; - let file_offset = file_access_info.file_offset as usize; - let file_num = file_access_info.file_num as usize; - let mmap = &self.mmaps[file_num]; - let slice = &mmap[file_offset..file_offset + size_in_bytes]; - let pl_len = u64::from_le_bytes(slice.try_into()?) as usize; - let slice = - &mmap[file_offset + size_in_bytes..file_offset + PL_METADATA_LEN * size_in_bytes]; - let pl_offset = u64::from_le_bytes(slice.try_into()?) as usize; + let metadata_slice = Self::get_slice( + &self.metadata_mmaps, + metadata_offset, + metadata_len, + self.backing_file_size, + )?; + + let pl_len = u64::from_le_bytes(metadata_slice[..u64_bytes].try_into()?) as usize; + let pl_offset = u64::from_le_bytes(metadata_slice[u64_bytes..].try_into()?) as usize; - let file_access_info = self.offset_to_file_access_info(pl_offset)?; - let required_size = pl_len * size_in_bytes; + let file_access_info = + Self::offset_to_file_access_info(pl_offset, self.backing_file_size, &self.mmaps)?; + let required_size = pl_len * u64_bytes; // Posting list fits within a single mmap let file_offset = file_access_info.file_offset as usize; let file_num = file_access_info.file_num as usize; + let mmap = &self.mmaps[file_num]; + if file_offset + required_size <= mmap.len() { - let mmap = &self.mmaps[file_num as usize]; let slice = &mmap[file_offset..file_offset + required_size]; return Ok(PostingList::new_with_slices(vec![slice])); } @@ -290,9 +326,9 @@ impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { while remaining_elem > 0 { let mmap = &self.mmaps[current_file_num]; let bytes_left_in_mmap = mmap.len() - current_offset; - let elems_in_mmap = std::cmp::min(remaining_elem, bytes_left_in_mmap / size_in_bytes); + let elems_in_mmap = std::cmp::min(remaining_elem, bytes_left_in_mmap / u64_bytes); - let slice = &mmap[current_offset..current_offset + elems_in_mmap * size_in_bytes]; + let slice = &mmap[current_offset..current_offset + elems_in_mmap * u64_bytes]; posting_list.add_slice(slice); remaining_elem -= elems_in_mmap; @@ -301,7 +337,7 @@ impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { current_file_num += 1; current_offset = 0; if current_file_num >= self.mmaps.len() { - return Err(anyhow!("Current file nunber out of bound")); + return Err(anyhow!("Current file number out of bound")); } } } @@ -310,46 +346,29 @@ impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { } fn append(&mut self, posting_list: &[u64]) -> Result<()> { - if self.current_num_of_posting_list == self.num_clusters { - return Err(anyhow!( - "Trying to append more posting lists than number of clusters" - )); - } - let required_size = posting_list.len() * std::mem::size_of::(); + let required_size = posting_list.len() * size_of::(); self.size_bytes += required_size; let should_flush = self.resident && self.size_bytes > self.memory_threshold; let flush = should_flush && !self.resident_posting_lists.is_empty(); // Good case, where file is still resident if self.resident && !should_flush { - self.current_num_of_posting_list += 1; + self.entry_count += 1; self.resident_posting_lists.push(posting_list.to_vec()); return Ok(()); } // Spill to disk, creating new files if necessary - let pl_metadata_in_bytes = PL_METADATA_LEN * std::mem::size_of::(); + let pl_metadata_in_bytes = PL_METADATA_LEN * size_of::(); if flush { self.flush_resident_posting_lists_to_disk()?; - // At this point we are not in memory anymore, we'll need to - // take into account the storage for metadata. - self.size_bytes += self.resident_posting_lists.len() * pl_metadata_in_bytes; } self.size_bytes += pl_metadata_in_bytes; // We should spill to disk, but did not flush (there was nothing to flush) if self.resident && !flush { - // Trigger the creation of a new file since this is the first time we write - // to disk - self.current_offset = self.backing_file_size; // Flip the flag since we are technically on disk at this point self.resident = false; - } else { - // Adjust the offset to write metadata first - let metadata_offset = self.current_num_of_posting_list * pl_metadata_in_bytes; - let file_access_info = self.offset_to_file_access_info(metadata_offset)?; - self.current_offset = file_access_info.file_offset; - self.current_backing_id = file_access_info.file_num as i32; } self.append_posting_list_to_disk(posting_list)?; Ok(()) @@ -363,24 +382,21 @@ impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { self.flush_resident_posting_lists_to_disk()?; } - for (i, mmap) in self.mmaps.iter().enumerate() { - let bytes_to_write = if i as i32 == self.current_backing_id { - self.current_offset - } else { - mmap.len() - }; - - let bytes_written = wrap_write(writer, &mmap[..bytes_to_write])?; - total_bytes_written += bytes_written; + // First write the metadata + total_bytes_written += Self::write_mmaps( + writer, + &self.metadata_mmaps, + self.metadata_current_overall_offset, + self.backing_file_size, + )?; - if bytes_written != bytes_to_write { - return Err(anyhow!( - "Failed to write entire mmap: expected {} bytes, wrote {} bytes", - bytes_to_write, - bytes_written - )); - } - } + // Now write the posting lists + total_bytes_written += Self::write_mmaps( + writer, + &self.mmaps, + self.current_overall_offset, + self.backing_file_size, + )?; writer.flush()?; @@ -388,14 +404,14 @@ impl<'a> PostingListStorage<'a> for FileBackedAppendablePostingListStorage { } fn len(&self) -> usize { - self.current_num_of_posting_list + self.entry_count } fn config(&self) -> PostingListStorageConfig { PostingListStorageConfig { memory_threshold: self.memory_threshold, file_size: self.backing_file_size, - num_clusters: self.num_clusters, + num_clusters: self.entry_count, } } } @@ -415,7 +431,6 @@ mod tests { base_directory, 1024, // memory_threshold 4096, // backing_file_size - 10, // num_clusters ); let pl1 = vec![1, 2, 3]; @@ -426,7 +441,7 @@ mod tests { assert_eq!(storage.resident_posting_lists.len(), 2); assert_eq!( storage.size_bytes, - (pl1.len() + pl2.len()) * std::mem::size_of::() + (pl1.len() + pl2.len()) * size_of::() ); assert_eq!( storage @@ -454,7 +469,6 @@ mod tests { base_directory, 10, // set a low threshold to force flushing to disk 4096, // backing_file_size - 10, // num_clusters ); let pl1 = vec![1, 2, 3]; @@ -493,38 +507,21 @@ mod tests { ); // Verify posting list metadata - assert!(storage.mmaps.len() == 1); - let size_in_bytes = std::mem::size_of::(); - let metadata_size = PL_METADATA_LEN * size_in_bytes; // length and offset - let mmap = &storage.mmaps[0]; + assert!(storage.metadata_mmaps.len() == 1); + let u64_bytes = size_of::(); + let metadata_size = PL_METADATA_LEN * u64_bytes; // length and offset + let mmap = &storage.metadata_mmaps[0]; // Read length - let length_bytes: [u8; 8] = mmap[0..size_in_bytes].try_into().unwrap(); + let length_bytes: [u8; 8] = mmap[0..u64_bytes].try_into().unwrap(); let length = u64::from_le_bytes(length_bytes); assert_eq!(length, pl1.len() as u64); // Read offset - let offset_bytes: [u8; 8] = mmap[size_in_bytes..metadata_size].try_into().unwrap(); + let offset_bytes: [u8; 8] = mmap[u64_bytes..metadata_size].try_into().unwrap(); let offset = u64::from_le_bytes(offset_bytes); // Verify that the offset points to the correct location - let expected_offset = storage.get_posting_list_offset(); - assert_eq!(offset, expected_offset as u64); - } - - #[test] - fn test_append_more_than_num_clusters() { - let tempdir = tempdir::TempDir::new("append_more_than_num_clusters_test").unwrap(); - let base_directory = tempdir.path().to_str().unwrap().to_string(); - let mut storage = FileBackedAppendablePostingListStorage::new( - base_directory, - 10, // set a low threshold to force flushing to disk - 4096, // backing_file_size - 2, // num_clusters - ); - - storage.append(&[1, 2, 3]).unwrap(); - storage.append(&[4, 5, 6]).unwrap(); - assert!(storage.append(&[7, 8, 9]).is_err()); + assert_eq!(offset, 0); } #[test] @@ -535,7 +532,6 @@ mod tests { base_directory, 10, // set a low threshold to force flushing to disk 4096, // backing_file_size - 2, // num_clusters ); assert!(storage.get(0).is_err()); } @@ -548,7 +544,6 @@ mod tests { base_directory, 10, // set a low threshold to force flushing to disk 32, // small backing file size to force multiple mmaps - 2, // num_clusters ); let large_pl = (0..100).collect::>(); @@ -564,15 +559,13 @@ mod tests { assert_eq!(retrieved_pl, large_pl); // Verify that the posting list data spans multiple mmaps - let size_in_bytes = std::mem::size_of::(); - let data_size = large_pl.len() * size_in_bytes; - let first_mmap_data_size = storage.backing_file_size - storage.get_posting_list_offset(); - assert!(data_size > first_mmap_data_size); + let u64_bytes = size_of::(); + let data_size = large_pl.len() * u64_bytes; + assert!(data_size > storage.backing_file_size); // Calculate how many mmaps should be used - let expected_mmap_count = 2 - + (data_size - first_mmap_data_size + storage.backing_file_size - 1) - / storage.backing_file_size; + let expected_mmap_count = + (data_size + storage.backing_file_size - 1) / storage.backing_file_size; assert_eq!(storage.mmaps.len(), expected_mmap_count); } @@ -580,8 +573,7 @@ mod tests { fn test_offset_to_file_access_info() { let tempdir = tempdir::TempDir::new("offset_to_file_access_info_test").unwrap(); let base_directory = tempdir.path().to_str().unwrap().to_string(); - let mut storage = - FileBackedAppendablePostingListStorage::new(base_directory, 1024, 4096, 2); + let mut storage = FileBackedAppendablePostingListStorage::new(base_directory, 1024, 4096); storage .mmaps .push(memmap2::MmapMut::map_anon(4096).unwrap()); @@ -589,19 +581,30 @@ mod tests { .mmaps .push(memmap2::MmapMut::map_anon(4096).unwrap()); - let result = storage.offset_to_file_access_info(4500).unwrap(); + let result = FileBackedAppendablePostingListStorage::offset_to_file_access_info( + 4500, + storage.backing_file_size, + &storage.mmaps, + ) + .unwrap(); assert_eq!(result.file_num, 1); assert_eq!(result.file_offset, 404); - assert!(storage.offset_to_file_access_info(10000).is_err()); + assert!( + FileBackedAppendablePostingListStorage::offset_to_file_access_info( + 10000, + storage.backing_file_size, + &storage.mmaps + ) + .is_err() + ); } #[test] fn test_flush_resident_posting_lists_to_disk() { let tempdir = tempdir::TempDir::new("flush_resident_posting_lists_to_disk_test").unwrap(); let base_directory = tempdir.path().to_str().unwrap().to_string(); - let mut storage = - FileBackedAppendablePostingListStorage::new(base_directory, 1024, 4096, 2); + let mut storage = FileBackedAppendablePostingListStorage::new(base_directory, 1024, 4096); storage.append(&[1, 2, 3]).unwrap(); storage.append(&[4, 5, 6]).unwrap(); @@ -631,10 +634,8 @@ mod tests { fn test_append_posting_list_to_disk() { let tempdir = tempdir::TempDir::new("append_resident_posting_list_to_disk_test").unwrap(); let base_directory = tempdir.path().to_str().unwrap().to_string(); - let mut storage = - FileBackedAppendablePostingListStorage::new(base_directory, 1024, 4096, 2); + let mut storage = FileBackedAppendablePostingListStorage::new(base_directory, 1024, 4096); // Trigger the creation of a new file - storage.current_offset = storage.backing_file_size; storage.resident = false; storage.append_posting_list_to_disk(&[1, 2, 3]).unwrap(); @@ -655,13 +656,11 @@ mod tests { let memory_threshold = 1024 * 1024; // 1 MB let backing_file_size = 512 * 1024; // 512 KB - let num_clusters = 5; let mut storage = FileBackedAppendablePostingListStorage::new( base_directory, memory_threshold, backing_file_size, - num_clusters, ); // Create large posting lists that will span across multiple mmaps @@ -675,7 +674,7 @@ mod tests { storage.append(&large_posting_list_3).unwrap(); // Verify that the posting lists were appended correctly - assert_eq!(storage.current_num_of_posting_list, 3); + assert_eq!(storage.entry_count, 3); assert!(!storage.resident); // Retrieve and verify the posting lists @@ -717,7 +716,7 @@ mod tests { let tempdir = tempdir::TempDir::new("write_and_verify_test").unwrap(); let base_directory = tempdir.path().to_str().unwrap().to_string(); let output_path = format!("{}/output", base_directory); - let mut storage = FileBackedAppendablePostingListStorage::new(base_directory, 10, 4096, 2); + let mut storage = FileBackedAppendablePostingListStorage::new(base_directory, 10, 4096); let pl1 = vec![1, 2, 3]; let pl2 = vec![4, 5, 6, 7]; @@ -747,15 +746,15 @@ mod tests { let mut reader = std::io::BufReader::new(&output_file); // Read number of clusters - let mut num_clusters_bytes = [0u8; std::mem::size_of::()]; + let mut num_clusters_bytes = [0u8; size_of::()]; reader.read_exact(&mut num_clusters_bytes).unwrap(); let num_clusters = u64::from_le_bytes(num_clusters_bytes); - assert_eq!(num_clusters, storage.num_clusters as u64); + assert_eq!(num_clusters, storage.len() as u64); // Read metadata for each posting list - for i in 0..PL_METADATA_LEN { - let mut length_bytes = [0u8; std::mem::size_of::()]; - let mut offset_bytes = [0u8; std::mem::size_of::()]; + for i in 0..storage.len() { + let mut length_bytes = [0u8; size_of::()]; + let mut offset_bytes = [0u8; size_of::()]; reader.read_exact(&mut length_bytes).unwrap(); reader.read_exact(&mut offset_bytes).unwrap(); @@ -763,7 +762,14 @@ mod tests { let offset = u64::from_le_bytes(offset_bytes); assert_eq!(length as usize, if i == 0 { pl1.len() } else { pl2.len() }); - assert!(offset > 0); + assert_eq!( + offset as usize, + if i == 0 { + 0 + } else { + pl1.len() * size_of::() + } + ); } // Read and verify posting lists diff --git a/rs/index/src/posting_list/fixed_file.rs b/rs/index/src/posting_list/fixed_file.rs index eb37c5be..8c3d0579 100644 --- a/rs/index/src/posting_list/fixed_file.rs +++ b/rs/index/src/posting_list/fixed_file.rs @@ -37,11 +37,13 @@ impl FixedFilePostingListStorage { let data_offset = 8; let metadata_offset = data_offset + index * PL_METADATA_LEN * size_in_bytes; + let pl_start_offset = data_offset + self.num_clusters * PL_METADATA_LEN * size_in_bytes; + let slice = &self.mmap[metadata_offset..metadata_offset + size_in_bytes]; let pl_len = u64::from_le_bytes(slice.try_into()?) as usize; let slice = &self.mmap [metadata_offset + size_in_bytes..metadata_offset + PL_METADATA_LEN * size_in_bytes]; - let pl_offset = u64::from_le_bytes(slice.try_into()?) as usize + data_offset; + let pl_offset = u64::from_le_bytes(slice.try_into()?) as usize + pl_start_offset; let slice = &self.mmap[pl_offset..pl_offset + pl_len * std::mem::size_of::()]; Ok(transmute_u8_to_slice::(slice)) @@ -63,7 +65,7 @@ mod tests { let tempdir = tempdir::TempDir::new("fixed_file_posting_list_storage_test").unwrap(); let base_directory = tempdir.path().to_str().unwrap().to_string(); let mut appendable_storage = - FileBackedAppendablePostingListStorage::new(base_directory.clone(), 1024, 4096, 3); + FileBackedAppendablePostingListStorage::new(base_directory.clone(), 1024, 4096); appendable_storage .append(&vec![1, 2, 3, 4]) .expect("Failed to append posting list");