Skip to content

Commit

Permalink
Remove MB suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent 22e4181 commit e98ad6c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
4 changes: 3 additions & 1 deletion src/pypi_data/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,13 @@ def merge_datasets(
repo_path: Path,
output: Path,
max_buffer_size: Annotated[str, typer.Option()] = "10GB",
target_size: Annotated[str, typer.Option()] = "1.8GB",
):
with open_path(repo_path, mode="rb") as fd:
repos = Repos.model_validate_json(fd.read()).root
max_buffer_size = pydantic.RootModel[ByteSize].model_validate(max_buffer_size).root
asyncio.run(combine_parquet(repos, output, max_buffer_size))
target_size = pydantic.RootModel[ByteSize].model_validate(target_size).root
asyncio.run(combine_parquet(repos, output, max_buffer_size, target_size))


async def resolve_dataset_redirects(
Expand Down
31 changes: 16 additions & 15 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

log = structlog.get_logger()

TARGET_SIZE = 1024 * 1024 * 1024 * 1.8 # 1.8 GB
IO_BUFFER_SIZE = 1024 * 1024 * 50 # 50 MB


Expand All @@ -27,6 +26,7 @@ def append_buffer(
writer: pq.ParquetWriter,
batch: RecordBatch,
roll_up_path: Path,
target_size: int,
) -> bool:
initial_size = roll_up_path.stat().st_size
writer.write_batch(batch)
Expand All @@ -35,12 +35,12 @@ def append_buffer(
written_size = end_size - initial_size
log.info(
f"Wrote {batch.num_rows} rows "
f"Batch Size: {ByteSize(batch.nbytes).human_readable(decimal=True)} MB "
f"Initial Size: {ByteSize(initial_size).human_readable(decimal=True)} MB "
f"End Size: {ByteSize(end_size).human_readable(decimal=True)} MB "
f"Written: {ByteSize(written_size).human_readable(decimal=True)} MB"
f"Batch Size: {ByteSize(batch.nbytes).human_readable(decimal=True)} "
f"Initial Size: {ByteSize(initial_size).human_readable(decimal=True)} "
f"End Size: {ByteSize(end_size).human_readable(decimal=True)} "
f"Written: {ByteSize(written_size).human_readable(decimal=True)}"
)
return end_size >= TARGET_SIZE
return end_size >= target_size


def buffer_mem_size(buffer: Deque[tuple[tuple[int, str], RecordBatch]]) -> int:
Expand All @@ -61,9 +61,7 @@ async def fill_buffer(
start_time_ns = time.perf_counter_ns()

buffer_size = buffer_mem_size(buffer)
log.info(
f"Buffer size: {ByteSize(buffer_size).human_readable(decimal=True)} MB"
)
log.info(f"Buffer size: {ByteSize(buffer_size).human_readable(decimal=True)}")
if buffer_size >= max_buffer_size:
log.info(f"Buffer filled with {len(buffer)} entries")
break
Expand Down Expand Up @@ -129,7 +127,10 @@ def hash_parquet_keys(keys: list[tuple[int, str]]) -> str:


async def combine_parquet(
repositories: list[CodeRepository], directory: Path, max_buffer_size: ByteSize
repositories: list[CodeRepository],
directory: Path,
max_buffer_size: ByteSize,
target_size: ByteSize,
):
directory.mkdir(exist_ok=True)

Expand All @@ -143,10 +144,10 @@ async def combine_parquet(
max_buffer_size, # 10 GB
)
log.info(
f"Total system memory: {ByteSize(total_memory).human_readable(decimal=True)} MB"
f"Total system memory: {ByteSize(total_memory).human_readable(decimal=True)}"
)
log.info(
f"Configured buffer size: {ByteSize(max_buffer_size).human_readable(decimal=True)} MB"
f"Configured buffer size: {ByteSize(max_buffer_size).human_readable(decimal=True)}"
)

async with httpx.AsyncClient(follow_redirects=True) as client:
Expand All @@ -172,7 +173,7 @@ async def combine_parquet(
write_statistics=True,
schema=first_buffer.schema,
) as writer:
append_buffer(fd, writer, first_buffer, roll_up_path)
append_buffer(fd, writer, first_buffer, roll_up_path, target_size)

while buffer or repositories:
if not buffer:
Expand All @@ -184,7 +185,7 @@ async def combine_parquet(

key, batch = buffer.popleft()
keys.append(key)
if append_buffer(fd, writer, batch, roll_up_path):
if append_buffer(fd, writer, batch, roll_up_path, target_size):
break

hashed_keys = hash_parquet_keys(keys)
Expand All @@ -193,7 +194,7 @@ async def combine_parquet(
)

log.info(
f"Finished batch {roll_up_count}, {len(keys)} batches, {final_path.name=} {ByteSize(final_path.stat().st_size).human_readable(decimal=True)} MB"
f"Finished batch {roll_up_count}, {len(keys)} batches, {final_path.name=} {ByteSize(final_path.stat().st_size).human_readable(decimal=True)}"
)

roll_up_count += 1
2 changes: 1 addition & 1 deletion src/pypi_data/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def download_dataset(self, client: httpx.AsyncClient) -> bytes | None:
if response is None:
return None
log.info(
f"Downloaded parquet file with size {ByteSize(len(response.content)).human_readable(decimal=True)} MB"
f"Downloaded parquet file with size {ByteSize(len(response.content)).human_readable(decimal=True)}"
)
return response.content

Expand Down

0 comments on commit e98ad6c

Please sign in to comment.