diff --git a/src/pypi_data/cli.py b/src/pypi_data/cli.py index 86b277a1..ce500405 100644 --- a/src/pypi_data/cli.py +++ b/src/pypi_data/cli.py @@ -157,51 +157,6 @@ def merge_datasets(repo_path: Path, output: Path): asyncio.run(combine_parquet(repos, output)) -# -# -# @app.command() -# def output_sql(repo_path: Path, redirects: Annotated[bool, typer.Option()] = False, -# batches_of: Annotated[Optional[int], typer.Option()] = None, -# settings: Annotated[str, typer.Option()] = '', -# dialect: Annotated[str, typer.Option()] = 'duckdb', -# insert: Annotated[bool, typer.Option()] = False): -# # log.info(f'Reading repos from {repo_path}') -# contents = _read_path(repo_path) -# repos = Repos.model_validate_json(contents) -# -# if redirects: -# urls = asyncio.run(resolve_dataset_redirects(repos.root)) -# else: -# urls = [r.dataset_url for r in repos.root] -# -# if dialect == 'duckdb': -# def make_url(url: str): -# return f"read_parquet('{url}')" -# elif dialect == 'clickhouse': -# def make_url(url: str): -# return f"url('{url}', 'Parquet')" -# else: -# raise ValueError(f"Unknown dialect: {dialect}") -# -# sql_queries = [ -# f"SELECT * FROM {make_url(url)}" -# for url in urls -# ] -# -# if batches_of: -# batches = list(itertools.batched(sql_queries, batches_of)) -# else: -# batches = [sql_queries] -# -# for batch in batches: -# query = 'INSERT INTO output\n' if insert else '' -# query += "\nUNION ALL ".join(batch) -# if settings: -# query += f'\nSETTINGS {settings}' -# -# print(query + ';\n') - - async def resolve_dataset_redirects( repositories: list[CodeRepository], concurrency: int = 10 ) -> list[str]: diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index 9c43647a..3df228ae 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -62,8 +62,6 @@ async def fill_buffer( log.info(f"Failed to download {repo.dataset_url}") continue - table = table.combine_chunks() - for idx, batch in enumerate(table.to_batches(max_chunksize=2_500_000)): batch: RecordBatch digest = hashlib.sha256() diff --git a/src/pypi_data/datasets.py b/src/pypi_data/datasets.py index ef0cd0c4..b99c1971 100644 --- a/src/pypi_data/datasets.py +++ b/src/pypi_data/datasets.py @@ -35,7 +35,6 @@ class CodeRepository(pydantic.BaseModel): name: str url: HttpUrl ssh_url: str - parquet_url: HttpUrl index: RepositoryIndex | None = None @pydantic.computed_field() @@ -62,7 +61,6 @@ def fetch_all(cls, github: Github) -> list[Self]: name=repo.name, url=repo.html_url, ssh_url=repo.ssh_url, - parquet_url=f"{repo.html_url}/releases/download/latest/dataset.parquet", ) for repo in github.get_organization("pypi-data").get_repos() if repo.name.startswith("pypi-mirror-") @@ -127,16 +125,9 @@ async def download_dataset(self, client: httpx.AsyncClient) -> pa.Table | None: log.info( f"Loading parquet file with size {len(response.content) / 1024 / 1024:.1f} MB" ) - return pq.read_table(pa.py_buffer(memoryview(response.content))) - - # async with client.stream("GET", str(self.dataset_url)) as response: - # if self.check_response(response, follow_redirects=True) is None: - # return None - # - # with output.open("wb") as f: - # async for buffer in response.aiter_bytes(): - # f.write(buffer) - # return True + return pq.read_table( + pa.py_buffer(memoryview(response.content)) + ).combine_chunks() def without_index(self) -> Self: return self.model_copy(update={"index": None})