Skip to content

Commit

Permalink
Make buffer size memory based
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent 56faf4f commit 27d8627
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/pypi_data/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def github_client(github_token) -> Github:

@contextlib.contextmanager
def open_path(path: Path, mode: Literal["wb", "rb"]) -> Generator[BinaryIO, None, None]:
buffer_size = 1024 * 1024 * 50
buffer_size = 1024 * 1024 * 10 # 10 MB

if path.suffix == ".gz":
with gzip.open(path, mode) as gzip_fd:
Expand Down
20 changes: 16 additions & 4 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
log = structlog.get_logger()

TARGET_SIZE = 1024 * 1024 * 1024 * 1.8 # 1.8 GB
FILL_BUFFER_COUNT = 4 # Download this many datasets at once
IO_BUFFER_SIZE = 1024 * 1024 * 10 # 10 MB
FILL_BUFFER_MEM_SIZE = 1024 * 1024 * 1024 * 5 # 5 GB
IO_BUFFER_SIZE = 1024 * 1024 * 50 # 50 MB


def append_buffer(
Expand All @@ -35,15 +35,27 @@ def append_buffer(
return size >= TARGET_SIZE


def buffer_mem_size(buffer: Deque[tuple[tuple[int, str], RecordBatch]]) -> int:
return sum(batch.get_total_buffer_size() for (_, _), batch in buffer)


def buffer_at_capacity(size: int) -> bool:
return size >= FILL_BUFFER_MEM_SIZE


async def fill_buffer(
buffer: Deque[tuple[tuple[int, str], RecordBatch]],
client: httpx.AsyncClient,
repositories: Deque[CodeRepository],
path: Path,
) -> bool:
for _ in range(FILL_BUFFER_COUNT):
if not repositories:
while repositories:
buffer_size = buffer_mem_size(buffer)
log.info(f"Buffer size: {buffer_size / 1024 / 1024:.1f} MB")
if buffer_at_capacity(buffer_size):
log.info("Buffer filled")
break

repo = repositories.popleft()
log.info(f"Downloading {repo.dataset_url}")
if await repo.download_dataset(client, path) is False:
Expand Down

0 comments on commit 27d8627

Please sign in to comment.