From 2917be0956b1c23609a1d0dcf5fc451f84b04e91 Mon Sep 17 00:00:00 2001 From: Tom Forbes Date: Sat, 19 Oct 2024 22:35:05 +0100 Subject: [PATCH] ok --- src/pypi_data/combine_parquet.py | 38 +++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index d3ba8941..70e08118 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -1,3 +1,4 @@ +import hashlib from pathlib import Path import httpx @@ -13,7 +14,7 @@ TARGET_SIZE = 1024 * 1024 * 1024 * 1.7 # 1.5 GB -def append_buffer(writer: pq.ParquetWriter, batch: pa.RecordBatch, roll_up_path: Path) -> bool: +def append_buffer(writer: pq.ParquetWriter, batch: RecordBatch, roll_up_path: Path) -> bool: log.info(f"Writing batch: {batch.num_rows=} {batch.nbytes / 1024 / 1024:.1f} MB") writer.write_batch(batch) writer.file_handle.flush() @@ -22,12 +23,25 @@ def append_buffer(writer: pq.ParquetWriter, batch: pa.RecordBatch, roll_up_path: return size >= TARGET_SIZE -async def fill_buffer(buffer: list[RecordBatch], client: httpx.AsyncClient, repo: CodeRepository, path: Path): +async def fill_buffer(buffer: list[tuple[tuple[int, range], RecordBatch]], client: httpx.AsyncClient, + repo: CodeRepository, + path: Path): log.info(f"Downloading {repo.dataset_url}") await repo.download_dataset(client, path) log.info(f'Downloaded, reading {path}') table = pq.read_table(path, memory_map=True).combine_chunks() - buffer.extend(table.to_batches(max_chunksize=2_000_000)) + + start = 0 + for idx, batch in enumerate(table.to_batches(max_chunksize=2_000_000)): + buffer.append( + ((repo.number, range(start, start + batch.num_rows)), batch) + ) + start += batch.num_rows + + +def hash_parquet_keys(keys: list[tuple[int, range]]) -> str: + combined = "-".join(f"{number}-{start}-{end}" for number, (start, end) in keys) + return hashlib.sha256(combined.encode()).hexdigest() async def combine_parquet(repositories: list[CodeRepository], directory: Path): @@ -37,7 +51,7 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path): repo_file = directory / f"repo.parquet" roll_up_count = 0 - buffer: list[RecordBatch] = [] + buffer: list[tuple[tuple[int, range], RecordBatch]] = [] async with httpx.AsyncClient(follow_redirects=True) as client: while repositories: @@ -46,7 +60,9 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path): roll_up_path = directory / f"merged-{roll_up_count}.parquet" - first_buffer = buffer.pop(0) + keys = [] + first_key, first_buffer = buffer.pop(0) + keys.append(first_key) with pq.ParquetWriter(roll_up_path, compression="zstd", @@ -61,10 +77,20 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path): repo = repositories.pop(0) await fill_buffer(buffer, client, repo, repo_file) - batch = buffer.pop(0) + key, batch = buffer.pop(0) + keys.append(key) if append_buffer(writer, batch, roll_up_path): break + hashed_keys = hash_parquet_keys(keys) + final_path = roll_up_path.rename( + directory / f"merged-{roll_up_count}-{hashed_keys[:6]}.parquet" + ) + + log.info( + f"Finished batch {roll_up_count}, {len(keys)} keys, {final_path.name=} {final_path.stat().st_size / 1024 / 1024:.1f} MB" + ) + roll_up_count += 1 # for repo in repositories: