Skip to content

Commit

Permalink
Complete reindex implementation (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyb0807 authored Dec 11, 2024
1 parent e5621fd commit f280330
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 15 deletions.
116 changes: 116 additions & 0 deletions rs/index/src/ivf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,46 @@ impl IvfBuilder {
Ok(assigned_ids)
}

pub fn reindex(&mut self, temp_dir: String) -> Result<()> {
let assigned_ids = self.get_reassigned_ids()?;

// Update doc_id_mapping with reassigned IDs
let tmp_id_provider = self.doc_id_mapping.clone();
for (id, doc_id) in tmp_id_provider.into_iter().enumerate() {
let new_id = assigned_ids.get(id).ok_or(anyhow!(
"id in id_provider {} is larger than size of vectors",
id
))?;
self.doc_id_mapping[*new_id as usize] = doc_id;
}

// Build reverse assigned ids
let mut reverse_assigned_ids = vec![-1; self.doc_id_mapping.len()];
for (i, id) in assigned_ids.iter().enumerate() {
reverse_assigned_ids[*id as usize] = i as i32;
}

// Put the vectors to their reassigned places
let vector_storage_config = self.vectors.config();
let mut new_vector_storage = Box::new(FileBackedAppendableVectorStorage::<f32>::new(
temp_dir.clone(),
vector_storage_config.memory_threshold,
vector_storage_config.file_size,
vector_storage_config.num_features,
));

for i in 0..reverse_assigned_ids.len() {
let mapped_id = reverse_assigned_ids[i];
let vector = self.vectors.get(mapped_id as u32).unwrap();
new_vector_storage
.append(vector)
.unwrap_or_else(|_| panic!("append failed"));
}

self.vectors = new_vector_storage;
Ok(())
}

pub fn cleanup(&mut self) -> Result<()> {
let vectors_path = format!("{}/builder_vector_storage", self.config.base_directory);
let centroids_path = format!("{}/builder_centroid_storage", self.config.base_directory);
Expand Down Expand Up @@ -1153,6 +1193,82 @@ mod tests {
assert_eq!(assigned_ids[23], 23);
}

#[test]
fn test_ivf_builder_reindex() {
let temp_dir = tempdir::TempDir::new("ivf_builder_reindex_test")
.expect("Failed to create temporary directory");
let base_directory = temp_dir
.path()
.to_str()
.expect("Failed to convert temporary directory path to string")
.to_string();
let num_clusters = 4;
let num_features = 1;
let file_size = 4096;
let balance_factor = 0.0;
let max_posting_list_size = usize::MAX;
const NUM_VECTORS: usize = 22;
let mut builder = IvfBuilder::new(IvfBuilderConfig {
max_iteration: 1000,
batch_size: 4,
num_clusters,
num_data_points: NUM_VECTORS,
max_clusters_per_vector: 2,
distance_threshold: 0.1,
base_directory: base_directory.clone(),
memory_size: 1024,
file_size,
num_features,
tolerance: balance_factor,
max_posting_list_size,
})
.expect("Failed to create builder");

for i in 0..NUM_VECTORS {
builder
.add_vector(i as u64 + 100, vec![i as f32])
.expect("Vector should be added");
}

assert!(builder.add_posting_list(&vec![11, 12, 13]).is_ok());
assert!(builder.add_posting_list(&vec![0, 2, 4, 6, 8, 20]).is_ok());
assert!(builder.add_posting_list(&vec![9, 18, 20]).is_ok());
assert!(builder.add_posting_list(&vec![14, 15, 16, 18]).is_ok());
assert!(builder.add_posting_list(&vec![1, 3, 5, 7, 18, 20]).is_ok());
assert!(builder.add_posting_list(&vec![10, 15, 21]).is_ok());
assert!(builder.add_posting_list(&vec![10, 15, 17, 19]).is_ok());

builder
.reindex(base_directory.clone())
.expect("Failed to reindex");

let expected_vectors: [f32; NUM_VECTORS] = [
10.0, 14.0, 15.0, 1.0, 3.0, 5.0, 7.0, 9.0, 16.0, 18.0, 0.0, 2.0, 4.0, 6.0, 8.0, 20.0,
11.0, 12.0, 13.0, 21.0, 17.0, 19.0,
];

for i in 0..NUM_VECTORS {
assert_eq!(
builder
.vectors
.get(i as u32)
.expect(&format!("Failed to retrieve vector #{}", i))[0],
expected_vectors[i]
);
}

let expected_doc_ids: [u64; NUM_VECTORS] = [
10, 14, 15, 1, 3, 5, 7, 9, 16, 18, 0, 2, 4, 6, 8, 20, 11, 12, 13, 21, 17, 19,
];

assert_eq!(builder.doc_id_mapping.len(), NUM_VECTORS);

for (expected_doc_id, doc_id) in expected_doc_ids.iter().zip(builder.doc_id_mapping.iter())
{
assert_eq!(*doc_id, expected_doc_id + 100);
}
}

#[test]
fn test_ivf_builder() {
let temp_dir = tempdir::TempDir::new("ivf_builder_test")
Expand Down
6 changes: 3 additions & 3 deletions rs/index/src/ivf/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mod tests {

assert!(builder.build().is_ok());

assert!(writer.write(&mut builder).is_ok());
assert!(writer.write(&mut builder, false).is_ok());

let reader = IvfReader::new(base_directory.clone());
let index = reader.read().expect("Failed to read index file");
Expand Down Expand Up @@ -200,7 +200,7 @@ mod tests {
}

assert!(builder.build().is_ok());
assert!(writer.write(&mut builder).is_ok());
assert!(writer.write(&mut builder, false).is_ok());

let reader = IvfReader::new(base_directory.clone());
let index = reader.read().expect("Failed to read index file");
Expand All @@ -213,7 +213,7 @@ mod tests {
assert!(posting_list.is_ok());
let posting_list = posting_list.unwrap();

// It's possible that the posting list size is more than max_posting_list_size,
// It's possible that the posting list size is more than max_posting_list_size,
// but it should be less than 2x.
assert!(posting_list.len() <= 20);
}
Expand Down
18 changes: 15 additions & 3 deletions rs/index/src/ivf/writer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fs::{remove_file, File};
use std::fs::{create_dir_all, remove_file, File};
use std::io::{BufWriter, Write};

use anyhow::{anyhow, Context, Result};
use log::debug;
use utils::io::{append_file_to_writer, wrap_write};

use crate::ivf::builder::IvfBuilder;
Expand All @@ -16,7 +17,18 @@ impl IvfWriter {
Self { base_directory }
}

pub fn write(&self, ivf_builder: &mut IvfBuilder) -> Result<()> {
pub fn write(&self, ivf_builder: &mut IvfBuilder, reindex: bool) -> Result<()> {
if reindex {
let temp_dir = format!("{}/temp", self.base_directory);
create_dir_all(&temp_dir).context("failed to create temp directory")?;

// Reindex the vectors for efficient lookup
ivf_builder
.reindex(temp_dir)
.context("failed to reindex during write")?;
debug!("Finish reindexing");
}

let num_features = ivf_builder.config().num_features;
let num_clusters = ivf_builder.centroids().len();
let num_vectors = ivf_builder.vectors().len();
Expand Down Expand Up @@ -374,7 +386,7 @@ mod tests {
assert_eq!(builder.doc_id_mapping().len(), 1000);

assert!(builder.build().is_ok());
assert!(writer.write(&mut builder).is_ok());
assert!(writer.write(&mut builder, false).is_ok());

// Check if files were created and removed correctly
assert!(fs::metadata(format!("{}/vectors", base_directory)).is_ok());
Expand Down
2 changes: 1 addition & 1 deletion rs/index_writer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub enum IndexType {
pub struct BaseConfig {
pub output_path: String,
pub dimension: usize,
pub reindex: bool,

// Vector storage parameters
pub max_memory_size: usize,
Expand All @@ -35,7 +36,6 @@ pub struct HnswConfig {
pub num_layers: u8,
pub max_num_neighbors: usize,
pub ef_construction: u32,
pub reindex: bool,

// Quantizer parameters
pub quantizer_type: QuantizerType,
Expand Down
13 changes: 7 additions & 6 deletions rs/index_writer/src/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl IndexWriter {

info!("Start writing index");
let hnsw_writer = HnswWriter::new(hnsw_directory);
hnsw_writer.write(&mut hnsw_builder, index_builder_config.hnsw_config.reindex)?;
hnsw_writer.write(&mut hnsw_builder, index_builder_config.base_config.reindex)?;

// Cleanup tmp directory. It's ok to fail
std::fs::remove_dir_all(&pg_temp_dir).unwrap_or_default();
Expand Down Expand Up @@ -157,7 +157,7 @@ impl IndexWriter {

info!("Start writing index");
let ivf_writer = IvfWriter::new(ivf_directory);
ivf_writer.write(&mut ivf_builder)?;
ivf_writer.write(&mut ivf_builder, index_builder_config.base_config.reindex)?;

// Cleanup tmp directory. It's ok to fail
ivf_builder.cleanup()?;
Expand Down Expand Up @@ -256,11 +256,11 @@ impl IndexWriter {

info!("Start writing HNSW index for centroids");
let hnsw_writer = HnswWriter::new(hnsw_directory);
hnsw_writer.write(&mut hnsw_builder, hnsw_config.reindex)?;
hnsw_writer.write(&mut hnsw_builder, index_writer_config.base_config.reindex)?;

info!("Start writing IVF index");
let ivf_writer = IvfWriter::new(ivf_directory);
ivf_writer.write(&mut ivf_builder)?;
ivf_writer.write(&mut ivf_builder, index_writer_config.base_config.reindex)?;
ivf_builder.cleanup()?;

// Finally, write the index writer config
Expand Down Expand Up @@ -379,6 +379,7 @@ mod tests {
let base_config = BaseConfig {
output_path: base_directory.clone(),
dimension,
reindex: false,
max_memory_size: 1024 * 1024 * 1024, // 1 GB
file_size: 1024 * 1024 * 1024, // 1 GB
index_type: IndexType::Hnsw,
Expand All @@ -387,7 +388,6 @@ mod tests {
num_layers: 2,
max_num_neighbors: 10,
ef_construction: 100,
reindex: false,
quantizer_type: QuantizerType::ProductQuantizer,
subvector_dimension: 2,
num_bits: 2,
Expand Down Expand Up @@ -447,6 +447,7 @@ mod tests {
let base_config = BaseConfig {
output_path: base_directory.clone(),
dimension,
reindex: false,
max_memory_size: 1024 * 1024 * 1024, // 1 GB
file_size: 1024 * 1024 * 1024, // 1 GB
index_type: IndexType::Ivf,
Expand Down Expand Up @@ -509,6 +510,7 @@ mod tests {
let base_config = BaseConfig {
output_path: base_directory.clone(),
dimension,
reindex: false,
max_memory_size: 1024 * 1024 * 1024, // 1 GB
file_size: 1024 * 1024 * 1024, // 1 GB
index_type: IndexType::Spann,
Expand All @@ -517,7 +519,6 @@ mod tests {
num_layers: 2,
max_num_neighbors: 10,
ef_construction: 100,
reindex: false,
quantizer_type: QuantizerType::ProductQuantizer,
subvector_dimension: 2,
num_bits: 2,
Expand Down
4 changes: 2 additions & 2 deletions rs/index_writer/src/scripts/write_index_writer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
use std::fs::File;
use std::io::Write;

use index_writer::config::{BaseConfig, HnswConfig, SpannConfigWithBase, IvfConfig, QuantizerType};
use index_writer::config::{BaseConfig, HnswConfig, IvfConfig, QuantizerType, SpannConfigWithBase};

fn main() -> std::io::Result<()> {
let mut base_config = BaseConfig::default();
base_config.reindex = false;
base_config.dimension = 128;
base_config.output_path = "NONE".to_string();
base_config.max_memory_size = 1024 * 1024 * 1024; // 1 GB
Expand All @@ -26,7 +27,6 @@ fn main() -> std::io::Result<()> {
hnsw_config.num_layers = 4;
hnsw_config.max_num_neighbors = 32;
hnsw_config.ef_construction = 200;
hnsw_config.reindex = false;

let mut config = SpannConfigWithBase::default();
config.base_config = base_config;
Expand Down

0 comments on commit f280330

Please sign in to comment.