Skip to content

Commit

Permalink
ok
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 19, 2024
1 parent f5b6bc4 commit 2917be0
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
from pathlib import Path

import httpx
Expand All @@ -13,7 +14,7 @@
TARGET_SIZE = 1024 * 1024 * 1024 * 1.7 # 1.5 GB


def append_buffer(writer: pq.ParquetWriter, batch: pa.RecordBatch, roll_up_path: Path) -> bool:
def append_buffer(writer: pq.ParquetWriter, batch: RecordBatch, roll_up_path: Path) -> bool:
log.info(f"Writing batch: {batch.num_rows=} {batch.nbytes / 1024 / 1024:.1f} MB")
writer.write_batch(batch)
writer.file_handle.flush()
Expand All @@ -22,12 +23,25 @@ def append_buffer(writer: pq.ParquetWriter, batch: pa.RecordBatch, roll_up_path:
return size >= TARGET_SIZE


async def fill_buffer(buffer: list[RecordBatch], client: httpx.AsyncClient, repo: CodeRepository, path: Path):
async def fill_buffer(buffer: list[tuple[tuple[int, range], RecordBatch]], client: httpx.AsyncClient,
repo: CodeRepository,
path: Path):
log.info(f"Downloading {repo.dataset_url}")
await repo.download_dataset(client, path)
log.info(f'Downloaded, reading {path}')
table = pq.read_table(path, memory_map=True).combine_chunks()
buffer.extend(table.to_batches(max_chunksize=2_000_000))

start = 0
for idx, batch in enumerate(table.to_batches(max_chunksize=2_000_000)):
buffer.append(
((repo.number, range(start, start + batch.num_rows)), batch)
)
start += batch.num_rows


def hash_parquet_keys(keys: list[tuple[int, range]]) -> str:
combined = "-".join(f"{number}-{start}-{end}" for number, (start, end) in keys)
return hashlib.sha256(combined.encode()).hexdigest()


async def combine_parquet(repositories: list[CodeRepository], directory: Path):
Expand All @@ -37,7 +51,7 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):
repo_file = directory / f"repo.parquet"

roll_up_count = 0
buffer: list[RecordBatch] = []
buffer: list[tuple[tuple[int, range], RecordBatch]] = []

async with httpx.AsyncClient(follow_redirects=True) as client:
while repositories:
Expand All @@ -46,7 +60,9 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):

roll_up_path = directory / f"merged-{roll_up_count}.parquet"

first_buffer = buffer.pop(0)
keys = []
first_key, first_buffer = buffer.pop(0)
keys.append(first_key)

with pq.ParquetWriter(roll_up_path,
compression="zstd",
Expand All @@ -61,10 +77,20 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):
repo = repositories.pop(0)
await fill_buffer(buffer, client, repo, repo_file)

batch = buffer.pop(0)
key, batch = buffer.pop(0)
keys.append(key)
if append_buffer(writer, batch, roll_up_path):
break

hashed_keys = hash_parquet_keys(keys)
final_path = roll_up_path.rename(
directory / f"merged-{roll_up_count}-{hashed_keys[:6]}.parquet"
)

log.info(
f"Finished batch {roll_up_count}, {len(keys)} keys, {final_path.name=} {final_path.stat().st_size / 1024 / 1024:.1f} MB"
)

roll_up_count += 1

# for repo in repositories:
Expand Down

0 comments on commit 2917be0

Please sign in to comment.