Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete reindex implementation #188

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions rs/index/src/ivf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,46 @@ impl IvfBuilder {
Ok(assigned_ids)
}

pub fn reindex(&mut self, temp_dir: String) -> Result<()> {
let assigned_ids = self.get_reassigned_ids()?;

// Update doc_id_mapping with reassigned IDs
let tmp_id_provider = self.doc_id_mapping.clone();
for (id, doc_id) in tmp_id_provider.into_iter().enumerate() {
let new_id = assigned_ids.get(id).ok_or(anyhow!(
"id in id_provider {} is larger than size of vectors",
id
))?;
self.doc_id_mapping[*new_id as usize] = doc_id;
}

// Build reverse assigned ids
let mut reverse_assigned_ids = vec![-1; self.doc_id_mapping.len()];
for (i, id) in assigned_ids.iter().enumerate() {
reverse_assigned_ids[*id as usize] = i as i32;
}

// Put the vectors to their reassigned places
let vector_storage_config = self.vectors.config();
let mut new_vector_storage = Box::new(FileBackedAppendableVectorStorage::<f32>::new(
temp_dir.clone(),
vector_storage_config.memory_threshold,
vector_storage_config.file_size,
vector_storage_config.num_features,
));

for i in 0..reverse_assigned_ids.len() {
let mapped_id = reverse_assigned_ids[i];
let vector = self.vectors.get(mapped_id as u32).unwrap();
new_vector_storage
.append(vector)
.unwrap_or_else(|_| panic!("append failed"));
}

self.vectors = new_vector_storage;
Ok(())
}

pub fn cleanup(&mut self) -> Result<()> {
let vectors_path = format!("{}/builder_vector_storage", self.config.base_directory);
let centroids_path = format!("{}/builder_centroid_storage", self.config.base_directory);
Expand Down Expand Up @@ -1153,6 +1193,82 @@ mod tests {
assert_eq!(assigned_ids[23], 23);
}

#[test]
fn test_ivf_builder_reindex() {
let temp_dir = tempdir::TempDir::new("ivf_builder_reindex_test")
.expect("Failed to create temporary directory");
let base_directory = temp_dir
.path()
.to_str()
.expect("Failed to convert temporary directory path to string")
.to_string();
let num_clusters = 4;
let num_features = 1;
let file_size = 4096;
let balance_factor = 0.0;
let max_posting_list_size = usize::MAX;
const NUM_VECTORS: usize = 22;
let mut builder = IvfBuilder::new(IvfBuilderConfig {
max_iteration: 1000,
batch_size: 4,
num_clusters,
num_data_points: NUM_VECTORS,
max_clusters_per_vector: 2,
distance_threshold: 0.1,
base_directory: base_directory.clone(),
memory_size: 1024,
file_size,
num_features,
tolerance: balance_factor,
max_posting_list_size,
})
.expect("Failed to create builder");

for i in 0..NUM_VECTORS {
builder
.add_vector(i as u64 + 100, vec![i as f32])
.expect("Vector should be added");
}

assert!(builder.add_posting_list(&vec![11, 12, 13]).is_ok());
assert!(builder.add_posting_list(&vec![0, 2, 4, 6, 8, 20]).is_ok());
assert!(builder.add_posting_list(&vec![9, 18, 20]).is_ok());
assert!(builder.add_posting_list(&vec![14, 15, 16, 18]).is_ok());
assert!(builder.add_posting_list(&vec![1, 3, 5, 7, 18, 20]).is_ok());
assert!(builder.add_posting_list(&vec![10, 15, 21]).is_ok());
assert!(builder.add_posting_list(&vec![10, 15, 17, 19]).is_ok());

builder
.reindex(base_directory.clone())
.expect("Failed to reindex");

let expected_vectors: [f32; NUM_VECTORS] = [
10.0, 14.0, 15.0, 1.0, 3.0, 5.0, 7.0, 9.0, 16.0, 18.0, 0.0, 2.0, 4.0, 6.0, 8.0, 20.0,
11.0, 12.0, 13.0, 21.0, 17.0, 19.0,
];

for i in 0..NUM_VECTORS {
assert_eq!(
builder
.vectors
.get(i as u32)
.expect(&format!("Failed to retrieve vector #{}", i))[0],
expected_vectors[i]
);
}

let expected_doc_ids: [u64; NUM_VECTORS] = [
10, 14, 15, 1, 3, 5, 7, 9, 16, 18, 0, 2, 4, 6, 8, 20, 11, 12, 13, 21, 17, 19,
];

assert_eq!(builder.doc_id_mapping.len(), NUM_VECTORS);

for (expected_doc_id, doc_id) in expected_doc_ids.iter().zip(builder.doc_id_mapping.iter())
{
assert_eq!(*doc_id, expected_doc_id + 100);
}
}

#[test]
fn test_ivf_builder() {
let temp_dir = tempdir::TempDir::new("ivf_builder_test")
Expand Down
6 changes: 3 additions & 3 deletions rs/index/src/ivf/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mod tests {

assert!(builder.build().is_ok());

assert!(writer.write(&mut builder).is_ok());
assert!(writer.write(&mut builder, false).is_ok());

let reader = IvfReader::new(base_directory.clone());
let index = reader.read().expect("Failed to read index file");
Expand Down Expand Up @@ -200,7 +200,7 @@ mod tests {
}

assert!(builder.build().is_ok());
assert!(writer.write(&mut builder).is_ok());
assert!(writer.write(&mut builder, false).is_ok());

let reader = IvfReader::new(base_directory.clone());
let index = reader.read().expect("Failed to read index file");
Expand All @@ -213,7 +213,7 @@ mod tests {
assert!(posting_list.is_ok());
let posting_list = posting_list.unwrap();

// It's possible that the posting list size is more than max_posting_list_size,
// It's possible that the posting list size is more than max_posting_list_size,
// but it should be less than 2x.
assert!(posting_list.len() <= 20);
}
Expand Down
18 changes: 15 additions & 3 deletions rs/index/src/ivf/writer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fs::{remove_file, File};
use std::fs::{create_dir_all, remove_file, File};
use std::io::{BufWriter, Write};

use anyhow::{anyhow, Context, Result};
use log::debug;
use utils::io::{append_file_to_writer, wrap_write};

use crate::ivf::builder::IvfBuilder;
Expand All @@ -16,7 +17,18 @@ impl IvfWriter {
Self { base_directory }
}

pub fn write(&self, ivf_builder: &mut IvfBuilder) -> Result<()> {
pub fn write(&self, ivf_builder: &mut IvfBuilder, reindex: bool) -> Result<()> {
if reindex {
let temp_dir = format!("{}/temp", self.base_directory);
create_dir_all(&temp_dir).context("failed to create temp directory")?;

// Reindex the vectors for efficient lookup
ivf_builder
.reindex(temp_dir)
.context("failed to reindex during write")?;
debug!("Finish reindexing");
}

let num_features = ivf_builder.config().num_features;
let num_clusters = ivf_builder.centroids().len();
let num_vectors = ivf_builder.vectors().len();
Expand Down Expand Up @@ -374,7 +386,7 @@ mod tests {
assert_eq!(builder.doc_id_mapping().len(), 1000);

assert!(builder.build().is_ok());
assert!(writer.write(&mut builder).is_ok());
assert!(writer.write(&mut builder, false).is_ok());

// Check if files were created and removed correctly
assert!(fs::metadata(format!("{}/vectors", base_directory)).is_ok());
Expand Down
2 changes: 1 addition & 1 deletion rs/index_writer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub enum IndexType {
pub struct BaseConfig {
pub output_path: String,
pub dimension: usize,
pub reindex: bool,

// Vector storage parameters
pub max_memory_size: usize,
Expand All @@ -35,7 +36,6 @@ pub struct HnswConfig {
pub num_layers: u8,
pub max_num_neighbors: usize,
pub ef_construction: u32,
pub reindex: bool,

// Quantizer parameters
pub quantizer_type: QuantizerType,
Expand Down
13 changes: 7 additions & 6 deletions rs/index_writer/src/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl IndexWriter {

info!("Start writing index");
let hnsw_writer = HnswWriter::new(hnsw_directory);
hnsw_writer.write(&mut hnsw_builder, index_builder_config.hnsw_config.reindex)?;
hnsw_writer.write(&mut hnsw_builder, index_builder_config.base_config.reindex)?;

// Cleanup tmp directory. It's ok to fail
std::fs::remove_dir_all(&pg_temp_dir).unwrap_or_default();
Expand Down Expand Up @@ -157,7 +157,7 @@ impl IndexWriter {

info!("Start writing index");
let ivf_writer = IvfWriter::new(ivf_directory);
ivf_writer.write(&mut ivf_builder)?;
ivf_writer.write(&mut ivf_builder, index_builder_config.base_config.reindex)?;

// Cleanup tmp directory. It's ok to fail
ivf_builder.cleanup()?;
Expand Down Expand Up @@ -256,11 +256,11 @@ impl IndexWriter {

info!("Start writing HNSW index for centroids");
let hnsw_writer = HnswWriter::new(hnsw_directory);
hnsw_writer.write(&mut hnsw_builder, hnsw_config.reindex)?;
hnsw_writer.write(&mut hnsw_builder, index_writer_config.base_config.reindex)?;

info!("Start writing IVF index");
let ivf_writer = IvfWriter::new(ivf_directory);
ivf_writer.write(&mut ivf_builder)?;
ivf_writer.write(&mut ivf_builder, index_writer_config.base_config.reindex)?;
ivf_builder.cleanup()?;

// Finally, write the index writer config
Expand Down Expand Up @@ -379,6 +379,7 @@ mod tests {
let base_config = BaseConfig {
output_path: base_directory.clone(),
dimension,
reindex: false,
max_memory_size: 1024 * 1024 * 1024, // 1 GB
file_size: 1024 * 1024 * 1024, // 1 GB
index_type: IndexType::Hnsw,
Expand All @@ -387,7 +388,6 @@ mod tests {
num_layers: 2,
max_num_neighbors: 10,
ef_construction: 100,
reindex: false,
quantizer_type: QuantizerType::ProductQuantizer,
subvector_dimension: 2,
num_bits: 2,
Expand Down Expand Up @@ -447,6 +447,7 @@ mod tests {
let base_config = BaseConfig {
output_path: base_directory.clone(),
dimension,
reindex: false,
max_memory_size: 1024 * 1024 * 1024, // 1 GB
file_size: 1024 * 1024 * 1024, // 1 GB
index_type: IndexType::Ivf,
Expand Down Expand Up @@ -509,6 +510,7 @@ mod tests {
let base_config = BaseConfig {
output_path: base_directory.clone(),
dimension,
reindex: false,
max_memory_size: 1024 * 1024 * 1024, // 1 GB
file_size: 1024 * 1024 * 1024, // 1 GB
index_type: IndexType::Spann,
Expand All @@ -517,7 +519,6 @@ mod tests {
num_layers: 2,
max_num_neighbors: 10,
ef_construction: 100,
reindex: false,
quantizer_type: QuantizerType::ProductQuantizer,
subvector_dimension: 2,
num_bits: 2,
Expand Down
4 changes: 2 additions & 2 deletions rs/index_writer/src/scripts/write_index_writer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
use std::fs::File;
use std::io::Write;

use index_writer::config::{BaseConfig, HnswConfig, SpannConfigWithBase, IvfConfig, QuantizerType};
use index_writer::config::{BaseConfig, HnswConfig, IvfConfig, QuantizerType, SpannConfigWithBase};

fn main() -> std::io::Result<()> {
let mut base_config = BaseConfig::default();
base_config.reindex = false;
base_config.dimension = 128;
base_config.output_path = "NONE".to_string();
base_config.max_memory_size = 1024 * 1024 * 1024; // 1 GB
Expand All @@ -26,7 +27,6 @@ fn main() -> std::io::Result<()> {
hnsw_config.num_layers = 4;
hnsw_config.max_num_neighbors = 32;
hnsw_config.ef_construction = 200;
hnsw_config.reindex = false;

let mut config = SpannConfigWithBase::default();
config.base_config = base_config;
Expand Down
Loading