diff --git a/src/pypi_data/cli.py b/src/pypi_data/cli.py index b4146fec..3fbecd2a 100644 --- a/src/pypi_data/cli.py +++ b/src/pypi_data/cli.py @@ -158,11 +158,13 @@ def merge_datasets( repo_path: Path, output: Path, max_buffer_size: Annotated[str, typer.Option()] = "10GB", + target_size: Annotated[str, typer.Option()] = "1.8GB", ): with open_path(repo_path, mode="rb") as fd: repos = Repos.model_validate_json(fd.read()).root max_buffer_size = pydantic.RootModel[ByteSize].model_validate(max_buffer_size).root - asyncio.run(combine_parquet(repos, output, max_buffer_size)) + target_size = pydantic.RootModel[ByteSize].model_validate(target_size).root + asyncio.run(combine_parquet(repos, output, max_buffer_size, target_size)) async def resolve_dataset_redirects( diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index 00e40947..11cfb10f 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -18,7 +18,6 @@ log = structlog.get_logger() -TARGET_SIZE = 1024 * 1024 * 1024 * 1.8 # 1.8 GB IO_BUFFER_SIZE = 1024 * 1024 * 50 # 50 MB @@ -27,6 +26,7 @@ def append_buffer( writer: pq.ParquetWriter, batch: RecordBatch, roll_up_path: Path, + target_size: int, ) -> bool: initial_size = roll_up_path.stat().st_size writer.write_batch(batch) @@ -35,12 +35,12 @@ def append_buffer( written_size = end_size - initial_size log.info( f"Wrote {batch.num_rows} rows " - f"Batch Size: {ByteSize(batch.nbytes).human_readable(decimal=True)} MB " - f"Initial Size: {ByteSize(initial_size).human_readable(decimal=True)} MB " - f"End Size: {ByteSize(end_size).human_readable(decimal=True)} MB " - f"Written: {ByteSize(written_size).human_readable(decimal=True)} MB" + f"Batch Size: {ByteSize(batch.nbytes).human_readable(decimal=True)} " + f"Initial Size: {ByteSize(initial_size).human_readable(decimal=True)} " + f"End Size: {ByteSize(end_size).human_readable(decimal=True)} " + f"Written: {ByteSize(written_size).human_readable(decimal=True)}" ) - return end_size >= TARGET_SIZE + return end_size >= target_size def buffer_mem_size(buffer: Deque[tuple[tuple[int, str], RecordBatch]]) -> int: @@ -61,9 +61,7 @@ async def fill_buffer( start_time_ns = time.perf_counter_ns() buffer_size = buffer_mem_size(buffer) - log.info( - f"Buffer size: {ByteSize(buffer_size).human_readable(decimal=True)} MB" - ) + log.info(f"Buffer size: {ByteSize(buffer_size).human_readable(decimal=True)}") if buffer_size >= max_buffer_size: log.info(f"Buffer filled with {len(buffer)} entries") break @@ -129,7 +127,10 @@ def hash_parquet_keys(keys: list[tuple[int, str]]) -> str: async def combine_parquet( - repositories: list[CodeRepository], directory: Path, max_buffer_size: ByteSize + repositories: list[CodeRepository], + directory: Path, + max_buffer_size: ByteSize, + target_size: ByteSize, ): directory.mkdir(exist_ok=True) @@ -143,10 +144,10 @@ async def combine_parquet( max_buffer_size, # 10 GB ) log.info( - f"Total system memory: {ByteSize(total_memory).human_readable(decimal=True)} MB" + f"Total system memory: {ByteSize(total_memory).human_readable(decimal=True)}" ) log.info( - f"Configured buffer size: {ByteSize(max_buffer_size).human_readable(decimal=True)} MB" + f"Configured buffer size: {ByteSize(max_buffer_size).human_readable(decimal=True)}" ) async with httpx.AsyncClient(follow_redirects=True) as client: @@ -172,7 +173,7 @@ async def combine_parquet( write_statistics=True, schema=first_buffer.schema, ) as writer: - append_buffer(fd, writer, first_buffer, roll_up_path) + append_buffer(fd, writer, first_buffer, roll_up_path, target_size) while buffer or repositories: if not buffer: @@ -184,7 +185,7 @@ async def combine_parquet( key, batch = buffer.popleft() keys.append(key) - if append_buffer(fd, writer, batch, roll_up_path): + if append_buffer(fd, writer, batch, roll_up_path, target_size): break hashed_keys = hash_parquet_keys(keys) @@ -193,7 +194,7 @@ async def combine_parquet( ) log.info( - f"Finished batch {roll_up_count}, {len(keys)} batches, {final_path.name=} {ByteSize(final_path.stat().st_size).human_readable(decimal=True)} MB" + f"Finished batch {roll_up_count}, {len(keys)} batches, {final_path.name=} {ByteSize(final_path.stat().st_size).human_readable(decimal=True)}" ) roll_up_count += 1 diff --git a/src/pypi_data/datasets.py b/src/pypi_data/datasets.py index 165eeeb3..1b854abc 100644 --- a/src/pypi_data/datasets.py +++ b/src/pypi_data/datasets.py @@ -124,7 +124,7 @@ async def download_dataset(self, client: httpx.AsyncClient) -> bytes | None: if response is None: return None log.info( - f"Downloaded parquet file with size {ByteSize(len(response.content)).human_readable(decimal=True)} MB" + f"Downloaded parquet file with size {ByteSize(len(response.content)).human_readable(decimal=True)}" ) return response.content