diff --git a/src/pypi_data/cli.py b/src/pypi_data/cli.py index add55768..9993799d 100644 --- a/src/pypi_data/cli.py +++ b/src/pypi_data/cli.py @@ -54,7 +54,7 @@ def github_client(github_token) -> Github: @contextlib.contextmanager def open_path(path: Path, mode: Literal["wb", "rb"]) -> Generator[BinaryIO, None, None]: - buffer_size = 1024 * 1024 * 50 + buffer_size = 1024 * 1024 * 10 # 10 MB if path.suffix == ".gz": with gzip.open(path, mode) as gzip_fd: diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index 9f37ad17..2e6948fc 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -14,8 +14,8 @@ log = structlog.get_logger() TARGET_SIZE = 1024 * 1024 * 1024 * 1.8 # 1.8 GB -FILL_BUFFER_COUNT = 4 # Download this many datasets at once -IO_BUFFER_SIZE = 1024 * 1024 * 10 # 10 MB +FILL_BUFFER_MEM_SIZE = 1024 * 1024 * 1024 * 5 # 5 GB +IO_BUFFER_SIZE = 1024 * 1024 * 50 # 50 MB def append_buffer( @@ -35,15 +35,27 @@ def append_buffer( return size >= TARGET_SIZE +def buffer_mem_size(buffer: Deque[tuple[tuple[int, str], RecordBatch]]) -> int: + return sum(batch.get_total_buffer_size() for (_, _), batch in buffer) + + +def buffer_at_capacity(size: int) -> bool: + return size >= FILL_BUFFER_MEM_SIZE + + async def fill_buffer( buffer: Deque[tuple[tuple[int, str], RecordBatch]], client: httpx.AsyncClient, repositories: Deque[CodeRepository], path: Path, ) -> bool: - for _ in range(FILL_BUFFER_COUNT): - if not repositories: + while repositories: + buffer_size = buffer_mem_size(buffer) + log.info(f"Buffer size: {buffer_size / 1024 / 1024:.1f} MB") + if buffer_at_capacity(buffer_size): + log.info("Buffer filled") break + repo = repositories.popleft() log.info(f"Downloading {repo.dataset_url}") if await repo.download_dataset(client, path) is False: