From 1d6518e668eceb2304bf7d5fdec6ba5fda893dfd Mon Sep 17 00:00:00 2001 From: Tom Forbes Date: Sun, 20 Oct 2024 13:28:56 +0100 Subject: [PATCH] Add flush back, offload computation --- src/pypi_data/combine_parquet.py | 2 +- src/pypi_data/datasets.py | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index d1563a0c..deb20421 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -26,7 +26,7 @@ def append_buffer( ) -> bool: initial_size = roll_up_path.stat().st_size writer.write_batch(batch) - # fd.flush() + fd.flush() end_size = roll_up_path.stat().st_size written_size = end_size - initial_size log.info( diff --git a/src/pypi_data/datasets.py b/src/pypi_data/datasets.py index b99c1971..110543a0 100644 --- a/src/pypi_data/datasets.py +++ b/src/pypi_data/datasets.py @@ -1,3 +1,4 @@ +import asyncio from datetime import datetime from http import HTTPStatus from typing import Self @@ -111,7 +112,9 @@ async def with_index(self, client: httpx.AsyncClient) -> Self | None: if response is None: return None try: - index = RepositoryIndex.model_validate_json(response.content) + index = await asyncio.to_thread( + RepositoryIndex.model_validate_json, response.content + ) return self.model_copy(update={"index": index}) except Exception as e: raise RuntimeError( @@ -125,9 +128,11 @@ async def download_dataset(self, client: httpx.AsyncClient) -> pa.Table | None: log.info( f"Loading parquet file with size {len(response.content) / 1024 / 1024:.1f} MB" ) - return pq.read_table( - pa.py_buffer(memoryview(response.content)) - ).combine_chunks() + return await asyncio.to_thread( + lambda: pq.read_table( + pa.py_buffer(memoryview(response.content)) + ).combine_chunks() + ) def without_index(self) -> Self: return self.model_copy(update={"index": None})