diff --git a/Cargo.lock b/Cargo.lock index 3222798..e3b2840 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,6 +579,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "demo" +version = "0.1.0" +dependencies = [ + "anyhow", + "env_logger", + "hdf5-metno", + "log", + "ndarray", + "proto", + "tokio", + "tonic", +] + [[package]] name = "dirs" version = "3.0.2" diff --git a/Cargo.toml b/Cargo.toml index ca5478e..e0ac3b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] -members = [ +members = [ + "rs/demo", "rs/aggregator", "rs/cli", "rs/compression", diff --git a/py/create_1m_hdf5.py b/py/create_1m_hdf5.py new file mode 100644 index 0000000..fcae8e9 --- /dev/null +++ b/py/create_1m_hdf5.py @@ -0,0 +1,32 @@ +import h5py +import numpy as np + +def create_hdf5(input_path, output_path, batch_size=100000): + # Create HDF5 file + with h5py.File(output_path, 'w') as hf: + # Initialize dataset with unknown size + dataset = hf.create_dataset('embeddings', + shape=(0, 768), + maxshape=(None, 768), + dtype='float32') + + # Process file in batches + with open(input_path, 'r') as f: + while True: + # Read batch of lines + lines = [f.readline() for _ in range(batch_size)] + if not lines[0]: # End of file + break + + # Parse lines to numpy array + embeddings = np.array([np.fromstring(line.strip('[]\n'), sep=',') + for line in lines if line.strip()]) + + # Resize dataset and append new data + dataset.resize(dataset.shape[0] + embeddings.shape[0], axis=0) + dataset[-embeddings.shape[0]:] = embeddings + +if __name__ == '__main__': + input_file = '/mnt/muopdb/raw/1m_embeddings.txt' + output_file = '/mnt/muopdb/1m_embeddings.hdf5' + create_hdf5(input_file, output_file) diff --git a/py/demo_insert_search.py b/py/demo_insert_search.py index 2cdb12a..ad3f534 100644 --- a/py/demo_insert_search.py +++ b/py/demo_insert_search.py @@ -1,166 +1,56 @@ import muopdb_client as mp import google.generativeai as genai import os - -SENTENCES = [ - "The sun rises in the east and sets in the west.", - "Rainforests are home to millions of species of plants and animals.", - "Climate change is causing glaciers to melt at an alarming rate.", - "Bees play a crucial role in pollinating crops.", - "The ocean covers more than 70% of the Earth's surface.", - "Planting trees can help reduce carbon dioxide in the atmosphere.", - "Hurricanes are becoming more intense due to global warming.", - "The Grand Canyon is one of the most breathtaking natural wonders.", - "Recycling helps reduce waste in landfills.", - "Solar energy is a clean and renewable source of power.", - "Artificial intelligence is transforming industries worldwide.", - "Smartphones have become an essential part of modern life.", - "Electric vehicles are gaining popularity as an eco-friendly alternative.", - "Social media platforms connect people across the globe.", - "Cybersecurity is critical to protecting personal data online.", - "Virtual reality allows users to experience immersive environments.", - "3D printing is revolutionizing manufacturing processes.", - "Blockchain technology is the backbone of cryptocurrencies.", - "Robots are increasingly being used in healthcare and surgery.", - "The internet has made information accessible to billions of people.", - "Regular exercise improves both physical and mental health.", - "Drinking enough water is essential for staying hydrated.", - "A balanced diet includes fruits, vegetables, and whole grains.", - "Meditation can help reduce stress and anxiety.", - "Getting enough sleep is crucial for overall well-being.", - "Smoking is a leading cause of lung cancer.", - "Yoga combines physical postures with mindfulness practices.", - "Vaccines have eradicated many deadly diseases.", - "Walking for 30 minutes a day can boost cardiovascular health.", - "Mental health is just as important as physical health.", - "Paris is known as the 'City of Love' and is famous for the Eiffel Tower.", - "Japan is a country that blends tradition with modernity.", - "The Great Wall of China is one of the Seven Wonders of the World.", - "Italy is renowned for its delicious cuisine and rich history.", - "Traveling broadens your perspective and introduces you to new cultures.", - "The Northern Lights are a spectacular natural phenomenon.", - "India is a diverse country with many languages and traditions.", - "Australia is home to unique wildlife like kangaroos and koalas.", - "The pyramids of Egypt are a testament to ancient engineering.", - "New York City is often called 'The City That Never Sleeps.'", - "Reading books enhances vocabulary and critical thinking skills.", - "Lifelong learning is key to personal and professional growth.", - "Online courses have made education more accessible.", - "Mathematics is the foundation of many scientific disciplines.", - "Learning a new language can open up career opportunities.", - "History teaches us valuable lessons about the past.", - "Creativity is just as important as technical skills in many fields.", - "Teachers play a vital role in shaping young minds.", - "STEM education focuses on science, technology, engineering, and math.", - "Libraries are a valuable resource for knowledge and research.", - "Pizza originated in Italy and is now popular worldwide.", - "Sushi is a traditional Japanese dish made with rice and seafood.", - "Cooking at home is often healthier than eating out.", - "Chocolate is made from cocoa beans and is loved by many.", - "Spices like turmeric and cumin add flavor to dishes.", - "Breakfast is often called the most important meal of the day.", - "Vegetarian diets exclude meat but include plant-based foods.", - "Baking requires precise measurements and techniques.", - "Coffee is one of the most consumed beverages in the world.", - "Food waste is a major issue that needs to be addressed.", - "Leonardo da Vinci painted the famous Mona Lisa.", - "Music has the power to evoke emotions and bring people together.", - "Shakespeare is considered one of the greatest playwrights in history.", - "Movies are a popular form of entertainment worldwide.", - "The Beatles revolutionized the music industry in the 1960s.", - "Photography captures moments and tells stories visually.", - "Broadway is known for its spectacular theatrical performances.", - "Graffiti is a form of street art that expresses creativity.", - "Ballet is a classical dance form that requires discipline and skill.", - "Streaming platforms have changed the way we consume media.", - "The Earth orbits the sun in approximately 365 days.", - "Albert Einstein developed the theory of relativity.", - "Black holes are regions of space with extremely strong gravity.", - "DNA carries the genetic information of living organisms.", - "The Hubble Telescope has captured stunning images of distant galaxies.", - "Gravity is the force that keeps planets in orbit.", - "The periodic table organizes all known chemical elements.", - "Mars is often called the 'Red Planet' due to its color.", - "Photosynthesis is the process by which plants make their food.", - "The speed of light is approximately 299,792 kilometers per second.", - "Entrepreneurship involves taking risks to start a business.", - "Investing in stocks can yield high returns over time.", - "Marketing is essential for promoting products and services.", - "Small businesses are the backbone of many economies.", - "Cryptocurrencies like Bitcoin are decentralized digital currencies.", - "A good credit score is important for securing loans.", - "E-commerce has transformed the way people shop.", - "Networking is crucial for career advancement.", - "Inflation affects the purchasing power of money.", - "Sustainability is becoming a key focus for many companies.", - "Kindness is a universal language that everyone understands.", - "Time management is essential for achieving goals.", - "Laughter is often called the best medicine.", - "Volunteering can make a positive impact on your community.", - "Patience is a virtue that leads to better decision-making.", - "Honesty builds trust in relationships.", - "Curiosity drives innovation and discovery.", - "Gratitude can improve mental health and happiness.", - "Challenges help us grow and become stronger.", - "Life is a journey filled with ups and downs.", -] - -def insert_all_documents(muopdb_client, collection_name, docs): - print("Inserting documents...") - id = 1 - for sentence in docs: - result = genai.embed_content( - model="models/text-embedding-004", - content=sentence) +import ollama +import logging +import h5py + +def insert_all_documents(muopdb_client: mp.IndexServerClient, collection_name, embeddings): + logging.info("Inserting documents...") + batch_size = 10000 + total_embeddings = len(embeddings) + + for start_idx in range(0, total_embeddings, batch_size): + end_idx = min(start_idx + batch_size, total_embeddings) + batch_embeddings = embeddings[start_idx:end_idx] + batch_ids = list(range(start_idx + 1, end_idx + 1)) + + # Flatten the batch embeddings into a single list of floats + flattened_vectors = [float(value) for embedding in batch_embeddings for value in embedding] muopdb_client.insert( collection_name=collection_name, - ids=[id], - vectors=result["embedding"] + ids=batch_ids, + vectors=flattened_vectors ) - if id % 10 == 0: - print(f"Inserted document up to id {id}") - id += 1 + + if end_idx % (batch_size * 10) == 0 or end_idx == total_embeddings: + logging.info(f"Inserted documents up to id {end_idx}") + + logging.info("Start indexing documents...") muopdb_client.flush(collection_name=collection_name) + logging.info("Indexing documents completed. Documents are ready to be queried.") # main function def main(): - with open(os.path.expanduser('~/.secrets/gemini.config'), 'r') as f: - api_key = f.read().strip() - genai.configure(api_key=api_key) - - query = "personal development" - print(f"Query: {query}") - query_vector = genai.embed_content( - model="models/text-embedding-004", - content=query)["embedding"] - - # Before inserting documents, there shouldn't be any results - muopdb_client = mp.IndexServerClient() - search_response = muopdb_client.search( - index_name="test-collection-1", - vector=query_vector, - top_k=5, - ef_construction=50, - record_metrics=False + # Configure logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] ) - print("Before inserting documents, number of results: ", len(search_response.ids)) + + muopdb_client = mp.IndexServerClient() + logging.info("=========== Inserting documents ===========") - print("=========== Inserting documents ===========") - insert_all_documents(muopdb_client, "test-collection-1", SENTENCES) - print("=========== Inserted all documents ===========") + # Read the source embedding file + with h5py.File("/mnt/muopdb/raw/1m_embeddings.hdf5", "r") as f: + embeddings = f['embeddings'][:] - # After inserting documents, there should be results - print("Query: ", query) - search_response = muopdb_client.search( - index_name="test-collection-1", - vector=query_vector, - top_k=5, - ef_construction=50, - record_metrics=False - ) - print("After inserting documents, number of results: ", len(search_response.ids)) - for id in search_response.ids: - print(f"RESULT: {SENTENCES[id - 1]}") + # Insert the embeddings into MuopDB + insert_all_documents(muopdb_client, "test-collection-1", embeddings) + logging.info("=========== Inserted all documents ===========") if __name__ == "__main__": diff --git a/py/demo_search.py b/py/demo_search.py new file mode 100644 index 0000000..3287282 --- /dev/null +++ b/py/demo_search.py @@ -0,0 +1,30 @@ +import muopdb_client as mp +import ollama +import time + +if __name__ == "__main__": + # Example usage for IndexServer + muopdb_client = mp.IndexServerClient() + query = "personal career development" + query_vector = ollama.embeddings(model='nomic-embed-text', prompt=query)["embedding"] + + # Read back the raw data to print the responses + with open("/mnt/muopdb/raw/1m_sentences.txt", "r") as f: + sentences = [line.strip() for line in f] + + start = time.time() + search_response = muopdb_client.search( + index_name="test-collection-1", + vector=query_vector, + top_k=5, + ef_construction=50, + record_metrics=False + ) + end = time.time() + print(f"Time taken for search: {end - start} seconds") + + print(f"Number of results: {len(search_response.ids)}") + print("================") + for id in search_response.ids: + print(f"RESULT: {sentences[id - 1]}") + print("================") diff --git a/py/embed_1m_sentences.py b/py/embed_1m_sentences.py new file mode 100644 index 0000000..5679ba8 --- /dev/null +++ b/py/embed_1m_sentences.py @@ -0,0 +1,19 @@ +import ollama +import time +import sys + +if __name__ == "__main__": + # Read the first args + input_file = sys.argv[1] + with open(input_file, "r") as f: + sentences = f.readlines() + + start = time.time() + output_file = f"{input_file}_embeddings.txt" + with open(output_file, "w") as f: + for sentence in sentences: + result = ollama.embeddings(model='nomic-embed-text', prompt=sentence) + f.write(f"{result['embedding']}\n") + + end = time.time() + print(f"Time taken: {end - start} seconds") diff --git a/py/generate_1m_sentences.py b/py/generate_1m_sentences.py new file mode 100644 index 0000000..69cd24d --- /dev/null +++ b/py/generate_1m_sentences.py @@ -0,0 +1,9 @@ +import datasets + +if __name__ == "__main__": + ds = datasets.load_dataset("agentlans/high-quality-english-sentences") + train = ds["train"] + with open("/tmp/1m_sentences.txt", "w") as f: + for ln in train: + f.write(ln["text"] + "\n") + f.flush() diff --git a/py/muopdb_client.py b/py/muopdb_client.py index 6d9fac7..5d7793a 100644 --- a/py/muopdb_client.py +++ b/py/muopdb_client.py @@ -4,7 +4,7 @@ """ Make sure to build the proto files first: -pip install grpcio protobuf grpcio-tools +pip install grpcio protobuf grpcio-tools ollama python3 -m grpc.tools.protoc -I=rs/proto/proto --python_out=py/ --grpc_python_out=py/ rs/proto/proto/muopdb.proto """ diff --git a/py/read_hdf5.py b/py/read_hdf5.py new file mode 100644 index 0000000..e149b1d --- /dev/null +++ b/py/read_hdf5.py @@ -0,0 +1,15 @@ +import h5py + +def read_first_and_last_five_points(): + with h5py.File("/mnt/muopdb/raw/1m_embeddings.hdf5", "r") as f: + # Assuming the dataset is named 'embeddings' + dataset = f['embeddings'] + first_five = dataset[:5] + last_five = dataset[-5:] + print("First 5 points:") + print(first_five) + print("\nLast 5 points:") + print(last_five) + +if __name__ == "__main__": + read_first_and_last_five_points() \ No newline at end of file diff --git a/rs/demo/Cargo.toml b/rs/demo/Cargo.toml new file mode 100644 index 0000000..a59a81d --- /dev/null +++ b/rs/demo/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "demo" +version = "0.1.0" +edition = "2024" + +[dependencies] +tonic.workspace = true +hdf5.workspace = true +log.workspace = true +env_logger.workspace = true +anyhow.workspace = true +proto.workspace = true +tokio.workspace = true +ndarray.workspace = true diff --git a/rs/demo/src/main.rs b/rs/demo/src/main.rs new file mode 100644 index 0000000..b526ff1 --- /dev/null +++ b/rs/demo/src/main.rs @@ -0,0 +1,74 @@ +use std::time::Instant; + +use anyhow::{Context, Result}; +use hdf5::File; +use log::{LevelFilter, info}; +use proto::muopdb::index_server_client::IndexServerClient; +use proto::muopdb::{FlushRequest, InsertRequest}; +use ndarray::s; + +#[tokio::main] +async fn main() -> Result<()> { + // Configure logging + env_logger::Builder::new() + .filter_level(LevelFilter::Info) + .format_timestamp_millis() + .init(); + + let addr = String::from("http://127.0.0.1:9002"); + // Create gRPC client + let mut client = IndexServerClient::connect(addr) + .await + .context("Failed to connect to IndexServer")?; + + info!("=========== Inserting documents ==========="); + + // Read embeddings from HDF5 file + let file = File::open("/mnt/muopdb/raw/1m_embeddings.hdf5")?; + let dataset = file.dataset("embeddings")?; + let embeddings = dataset.read_2d::()?; + + // Insert embeddings in batches into MuopDB + let batch_size = 100_000; + let total_embeddings = embeddings.nrows(); + let mut start_idx = 0; + + let mut start = Instant::now(); + while start_idx < total_embeddings { + let end_idx = (start_idx + batch_size).min(total_embeddings); + let batch = &embeddings.slice(s![start_idx..end_idx, ..]); + + let mut vectors = Vec::with_capacity(batch.len()); + for row in batch.rows() { + vectors.extend(row.iter().map(|&v| v as f32)); + } + + // Generate IDs + let ids: Vec = (start_idx + 1..=end_idx).map(|i| i as u64).collect(); + + // Create and send insert request + let request = tonic::Request::new(InsertRequest { + collection_name: "test-collection-1".to_string(), + ids: ids.clone(), + vectors, + }); + + client.insert(request).await?; + start_idx = end_idx; + } + + let mut duration = start.elapsed(); + info!("Inserted all documents in {:?}", duration); + + // Done inserting, now start indexing. + info!("Start indexing documents..."); + start = Instant::now(); + let request = tonic::Request::new(FlushRequest { + collection_name: "test-collection-1".to_string(), + }); + client.flush(request).await?; + duration = start.elapsed(); + info!("Indexing documents completed in {:?}", duration); + + Ok(()) +} diff --git a/rs/index_server/src/index_server.rs b/rs/index_server/src/index_server.rs index b48bf06..15dedc2 100644 --- a/rs/index_server/src/index_server.rs +++ b/rs/index_server/src/index_server.rs @@ -3,7 +3,7 @@ use std::vec; use index::index::Searchable; use index::utils::SearchContext; -use log::info; +use log::{debug, info}; use proto::muopdb::index_server_server::IndexServer; use proto::muopdb::{ FlushRequest, FlushResponse, InsertRequest, InsertResponse, SearchRequest, SearchResponse, @@ -87,6 +87,7 @@ impl IndexServer for IndexServerImpl { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let start = std::time::Instant::now(); let req = request.into_inner(); let collection_name = req.collection_name; let ids = req.ids; @@ -117,6 +118,10 @@ impl IndexServer for IndexServerImpl { collection.insert(*id, vector).unwrap() }); + // log the duration + let end = std::time::Instant::now(); + let duration = end.duration_since(start); + debug!("Inserted {} vectors in {:?}", ids.len(), duration); Ok(tonic::Response::new(InsertResponse { inserted_ids: ids })) } None => Err(tonic::Status::new( @@ -130,6 +135,7 @@ impl IndexServer for IndexServerImpl { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let start = std::time::Instant::now(); let req = request.into_inner(); let collection_name = req.collection_name; @@ -140,6 +146,10 @@ impl IndexServer for IndexServerImpl { .get_collection(&collection_name) .await; + let end = std::time::Instant::now(); + let duration = end.duration_since(start); + debug!("Indexing collection {} in {:?}", collection_name, duration); + match collection_opt { Some(collection) => { collection.flush().unwrap();