Skip to content

Commit

Permalink
Try this
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent daea021 commit ab7fa21
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
15 changes: 9 additions & 6 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ def append_buffer(writer: pq.ParquetWriter, batch: RecordBatch, roll_up_path: Pa

async def fill_buffer(buffer: list[tuple[tuple[int, str], RecordBatch]], client: httpx.AsyncClient,
repo: CodeRepository,
path: Path):
path: Path) -> bool:
log.info(f"Downloading {repo.dataset_url}")
await repo.download_dataset(client, path)
if await repo.download_dataset(client, path) is False:
log.info(f"Failed to download {repo.dataset_url}")
return False
log.info(f'Downloaded, reading {path}')
table = pq.read_table(path, memory_map=True).combine_chunks()

Expand All @@ -43,6 +45,7 @@ async def fill_buffer(buffer: list[tuple[tuple[int, str], RecordBatch]], client:
((repo.number, digest), batch)
)
start += batch.num_rows
return True


def hash_parquet_keys(keys: list[tuple[int, str]]) -> str:
Expand All @@ -59,8 +62,8 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):

async with httpx.AsyncClient(follow_redirects=True) as client:
while repositories:
repo = repositories.pop(0)
await fill_buffer(buffer, client, repo, repo_file)
if await fill_buffer(buffer, client, repositories.pop(0), repo_file) is False:
continue

roll_up_path = directory / f"merged-{roll_up_count}.parquet"

Expand All @@ -78,8 +81,8 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):

while buffer or repositories:
if not buffer:
repo = repositories.pop(0)
await fill_buffer(buffer, client, repo, repo_file)
if await fill_buffer(buffer, client, repositories.pop(0), repo_file) is None:
continue

key, batch = buffer.pop(0)
keys.append(key)
Expand Down
32 changes: 19 additions & 13 deletions src/pypi_data/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,25 @@ def fetch_all(cls, github: Github) -> list[Self]:
if repo.name.startswith("pypi-mirror-")
]

def check_response(self, response: httpx.Response, follow_redirects: bool):
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code in (HTTPStatus.NOT_FOUND, HTTPStatus.UNAVAILABLE_FOR_LEGAL_REASONS):
log.warning(f"URL {response.url} not found for {self.name}")
return None
if not follow_redirects and e.response.is_redirect:
return e.response
raise

async def _make_request(self, client: httpx.AsyncClient, url: HttpUrl,
follow_redirects: bool = True) -> httpx.Response | None:
async for attempt in AsyncRetrying(stop=stop_after_attempt(3),
wait=wait_random_exponential(multiplier=1, max=4)):
with attempt:
response = await client.get(str(url), headers={'Accept-Encoding': 'gzip'},
follow_redirects=follow_redirects)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code in (HTTPStatus.NOT_FOUND, HTTPStatus.UNAVAILABLE_FOR_LEGAL_REASONS):
log.warning(f"URL {url} not found for {self.name}")
return None
if not follow_redirects and e.response.is_redirect:
return e.response
raise
return response
return self.check_response(response, follow_redirects)

async def get_temporary_dataset_url(self, client: httpx.AsyncClient) -> str | None:
response = await self._make_request(client, self.dataset_url, follow_redirects=True)
Expand All @@ -98,11 +100,15 @@ async def with_index(self, client: httpx.AsyncClient) -> Self:
except Exception as e:
raise RuntimeError(f'{self.index_url} failed to parse: {len(response.content)=}') from e

async def download_dataset(self, client: httpx.AsyncClient, output: Path):
async with client.stream("GET", str(self.dataset_url)) as resp:
async def download_dataset(self, client: httpx.AsyncClient, output: Path) -> bool:
async with client.stream("GET", str(self.dataset_url)) as response:
if self.check_response(response, follow_redirects=True) is None:
return False

with output.open("wb") as f:
async for buffer in resp.aiter_bytes():
async for buffer in response.aiter_bytes():
f.write(buffer)
return True

def without_index(self) -> Self:
return self.model_copy(update={'index': None})

0 comments on commit ab7fa21

Please sign in to comment.