Skip to content

Commit

Permalink
Add flush back, offload computation
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent fcb3666 commit 1d6518e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def append_buffer(
) -> bool:
initial_size = roll_up_path.stat().st_size
writer.write_batch(batch)
# fd.flush()
fd.flush()
end_size = roll_up_path.stat().st_size
written_size = end_size - initial_size
log.info(
Expand Down
13 changes: 9 additions & 4 deletions src/pypi_data/datasets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from datetime import datetime
from http import HTTPStatus
from typing import Self
Expand Down Expand Up @@ -111,7 +112,9 @@ async def with_index(self, client: httpx.AsyncClient) -> Self | None:
if response is None:
return None
try:
index = RepositoryIndex.model_validate_json(response.content)
index = await asyncio.to_thread(
RepositoryIndex.model_validate_json, response.content
)
return self.model_copy(update={"index": index})
except Exception as e:
raise RuntimeError(
Expand All @@ -125,9 +128,11 @@ async def download_dataset(self, client: httpx.AsyncClient) -> pa.Table | None:
log.info(
f"Loading parquet file with size {len(response.content) / 1024 / 1024:.1f} MB"
)
return pq.read_table(
pa.py_buffer(memoryview(response.content))
).combine_chunks()
return await asyncio.to_thread(
lambda: pq.read_table(
pa.py_buffer(memoryview(response.content))
).combine_chunks()
)

def without_index(self) -> Self:
return self.model_copy(update={"index": None})

0 comments on commit 1d6518e

Please sign in to comment.