Skip to content

Commit

Permalink
Make buffer size dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent 8187e57 commit c13a016
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ requires-python = "~=3.12.0"
dependencies = [
"hishel[sqlite]>=0.0.33",
"httpx>=0.27.2",
"psutil>=6.1.0",
"pyarrow>=17.0.0",
"pydantic-core>=2.23.4",
"pydantic>=2.9.2",
Expand Down
28 changes: 20 additions & 8 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Deque

import httpx
import psutil
import pyarrow
import pyarrow.parquet as pq
import structlog
Expand All @@ -14,7 +15,6 @@
log = structlog.get_logger()

TARGET_SIZE = 1024 * 1024 * 1024 * 1.8 # 1.8 GB
FILL_BUFFER_MEM_SIZE = 1024 * 1024 * 1024 * 5 # 5 GB
IO_BUFFER_SIZE = 1024 * 1024 * 50 # 50 MB


Expand Down Expand Up @@ -43,20 +43,17 @@ def buffer_mem_size(buffer: Deque[tuple[tuple[int, str], RecordBatch]]) -> int:
return sum(batch.nbytes for (_, _), batch in buffer)


def buffer_at_capacity(size: int) -> bool:
return size >= FILL_BUFFER_MEM_SIZE


async def fill_buffer(
buffer: Deque[tuple[tuple[int, str], RecordBatch]],
max_buffer_size: int,
client: httpx.AsyncClient,
repositories: Deque[CodeRepository],
path: Path,
) -> bool:
while repositories:
buffer_size = buffer_mem_size(buffer)
log.info(f"Buffer size: {buffer_size / 1024 / 1024:.1f} MB")
if buffer_at_capacity(buffer_size):
if buffer_size >= max_buffer_size:
log.info(f"Buffer filled with {len(buffer)} entries")
break

Expand Down Expand Up @@ -92,9 +89,22 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):
buffer: Deque[tuple[tuple[int, str], RecordBatch]] = deque()
repositories: Deque[CodeRepository] = deque(repositories)

total_memory = psutil.virtual_memory().total
max_buffer_size = min(
int(total_memory * 0.75), # 75% of total memory
1024 * 1024 * 1024 * 10, # 10 GB
)
log.info(f"Total system memory: {total_memory / 1024 / 1024} MB")
log.info(f"Configured buffer size: {max_buffer_size / 1024 / 1024} MB")

async with httpx.AsyncClient(follow_redirects=True) as client:
while repositories:
if await fill_buffer(buffer, client, repositories, repo_file) is False:
if (
await fill_buffer(
buffer, max_buffer_size, client, repositories, repo_file
)
is False
):
continue

roll_up_path = directory / f"merged-{roll_up_count}.parquet"
Expand All @@ -117,7 +127,9 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):

while buffer or repositories:
if not buffer:
res = await fill_buffer(buffer, client, repositories, repo_file)
res = await fill_buffer(
buffer, max_buffer_size, client, repositories, repo_file
)
if res is None:
continue

Expand Down
17 changes: 17 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c13a016

Please sign in to comment.