diff --git a/src/pypi_data/cli.py b/src/pypi_data/cli.py index 9ed312a4..5cf3a158 100644 --- a/src/pypi_data/cli.py +++ b/src/pypi_data/cli.py @@ -164,7 +164,8 @@ def merge_datasets( repos = Repos.model_validate_json(fd.read()).root max_buffer_size = pydantic.RootModel[ByteSize].model_validate(max_buffer_size).root target_size = pydantic.RootModel[ByteSize].model_validate(target_size).root - asyncio.run(combine_parquet(repos, output, max_buffer_size, target_size)) + # Debug failures... + asyncio.run(combine_parquet(repos[200:], output, max_buffer_size, target_size)) async def resolve_dataset_redirects( diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index 719f2e40..52e646c6 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -74,6 +74,7 @@ async def fill_buffer( max_buffer_size: int, client: httpx.AsyncClient, repositories: Deque[CodeRepository], + directory: Path, ) -> bool: while repositories: time_hashing_ns = 0 @@ -139,6 +140,7 @@ async def fill_buffer( f"iter={time_iterating_ns // 1_000_000} ms " f"hash={time_hashing_ns // 1_000_000} ms" ) + log_system_stats(directory) return bool(buffer) @@ -175,7 +177,9 @@ async def combine_parquet( async with httpx.AsyncClient(follow_redirects=True) as client: while repositories: if ( - await fill_buffer(buffer, max_buffer_size, client, repositories) + await fill_buffer( + buffer, max_buffer_size, client, repositories, directory + ) is False ): continue @@ -200,7 +204,7 @@ async def combine_parquet( while buffer or repositories: if not buffer: res = await fill_buffer( - buffer, max_buffer_size, client, repositories + buffer, max_buffer_size, client, repositories, directory ) if res is None: continue