From 3ef6c1b4080454d8ef6bb4b3524f35670e6bad4f Mon Sep 17 00:00:00 2001 From: Albert Sawczyn Date: Wed, 18 Sep 2024 11:21:51 +0200 Subject: [PATCH] feat: save to multiple files --- scripts/nsa/save_pages_from_db_to_file.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/scripts/nsa/save_pages_from_db_to_file.py b/scripts/nsa/save_pages_from_db_to_file.py index 0f63a53..1a9fcc0 100644 --- a/scripts/nsa/save_pages_from_db_to_file.py +++ b/scripts/nsa/save_pages_from_db_to_file.py @@ -11,24 +11,30 @@ DB_URI = "mongodb://localhost:27017/" -def fetch_documents(collection, batch_size=1000): +def fetch_documents(collection, batch_size=5000): cursor = collection.find().batch_size(batch_size) for doc in cursor: doc["_id"] = str(doc["_id"]) # Convert ObjectId to string yield doc -def write_to_parquet_in_chunks(file_path, collection, batch_size=1000): +def write_to_parquet_in_chunks(file_path, collection, batch_size=5000, chunk_size=50000): buffer = [] + chunk_index = 0 + for doc in tqdm(fetch_documents(collection, batch_size)): buffer.append(doc) - if len(buffer) >= batch_size: + if len(buffer) >= chunk_size: df = pd.DataFrame(buffer) - df.to_parquet(file_path, engine="pyarrow", compression="snappy", append=True) + chunk_file = file_path.parent / f"{file_path.stem}_chunk_{chunk_index}.parquet" + df.to_parquet(chunk_file, engine="pyarrow", compression="snappy") buffer = [] + chunk_index += 1 + if buffer: df = pd.DataFrame(buffer) - df.to_parquet(file_path, engine="pyarrow", compression="snappy", append=True) + chunk_file = file_path.parent / f"{file_path.stem}_chunk_{chunk_index}.parquet" + df.to_parquet(chunk_file, engine="pyarrow", compression="snappy") def main(