Skip to content

Commit

Permalink
Add progress
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent 09cd9a3 commit df74f10
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
16 changes: 8 additions & 8 deletions src/pypi_data/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,30 +81,30 @@ def load_repos(
repos = repos[:limit]

repos = asyncio.run(load_indexes(repos))
sorted_repos = sorted(repos, key=lambda r: r.number)
repos = sorted(repos, key=lambda r: r.number)

log.info("Loaded: writing file")
with open_path(repos_file, mode="wb") as fd:
for repo in repos:
for repo in tqdm.tqdm(repos):
fd.write(repo.model_dump_json().encode("utf-8"))
fd.write(b"\n")

log.info("Writing links")

(links_path / "dataset.txt").write_text(
"\n".join(str(repo.dataset_url) for repo in sorted_repos)
"\n".join(str(repo.dataset_url) for repo in repos)
)

(links_path / "repositories.txt").write_text(
"\n".join(str(repo.url) for repo in sorted_repos)
"\n".join(str(repo.url) for repo in repos)
)

(links_path / "repositories_ssh.txt").write_text(
"\n".join(str(repo.ssh_url) for repo in sorted_repos)
"\n".join(str(repo.ssh_url) for repo in repos)
)

(links_path / "repositories.json").write_text(
Repos([r.without_index() for r in sorted_repos]).model_dump_json(
Repos([r.without_index() for r in repos]).model_dump_json(
indent=2, exclude_none=True
)
)
Expand All @@ -113,7 +113,7 @@ def load_repos(

package_map: DefaultDict[str, list[PackageIndexPackage]] = defaultdict(list)

for repo in sorted_repos:
for repo in repos:
for package in repo.index.packages:
package_map[package.project_name].append(
PackageIndexPackage(
Expand All @@ -135,7 +135,7 @@ def load_repos(
log.info("Writing package index")

with open_path(packages_file, mode="wb") as fd:
for package_index in packages:
for package_index in tqdm.tqdm(packages):
fd.write(package_index.model_dump_json().encode("utf-8"))
fd.write(b"\n")

Expand Down
15 changes: 9 additions & 6 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import hashlib
from collections import deque
from pathlib import Path
from typing import Deque

import httpx
import pyarrow
Expand Down Expand Up @@ -30,15 +32,15 @@ def append_buffer(


async def fill_buffer(
buffer: list[tuple[tuple[int, str], RecordBatch]],
buffer: Deque[tuple[tuple[int, str], RecordBatch]],
client: httpx.AsyncClient,
repositories: list[CodeRepository],
repositories: Deque[CodeRepository],
path: Path,
) -> bool:
for _ in range(FILL_BUFFER_COUNT):
if not repositories:
break
repo = repositories.pop(0)
repo = repositories.popleft()
log.info(f"Downloading {repo.dataset_url}")
if await repo.download_dataset(client, path) is False:
log.info(f"Failed to download {repo.dataset_url}")
Expand Down Expand Up @@ -67,7 +69,8 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):
repo_file = directory / "repo.parquet"

roll_up_count = 0
buffer: list[tuple[tuple[int, str], RecordBatch]] = []
buffer: Deque[tuple[tuple[int, str], RecordBatch]] = deque()
repositories: Deque[CodeRepository] = deque(repositories)

async with httpx.AsyncClient(follow_redirects=True) as client:
while repositories:
Expand All @@ -77,7 +80,7 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):
roll_up_path = directory / f"merged-{roll_up_count}.parquet"

keys = []
first_key, first_buffer = buffer.pop(0)
first_key, first_buffer = buffer.popleft()
keys.append(first_key)

with pq.ParquetWriter(
Expand All @@ -98,7 +101,7 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):
):
continue

key, batch = buffer.pop(0)
key, batch = buffer.popleft()
keys.append(key)
if append_buffer(writer, batch, roll_up_path):
break
Expand Down

0 comments on commit df74f10

Please sign in to comment.