Skip to content

Commit

Permalink
Remove parquet_url
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 20, 2024
1 parent bc6b124 commit 9efb6dc
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 59 deletions.
45 changes: 0 additions & 45 deletions src/pypi_data/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,51 +157,6 @@ def merge_datasets(repo_path: Path, output: Path):
asyncio.run(combine_parquet(repos, output))


#
#
# @app.command()
# def output_sql(repo_path: Path, redirects: Annotated[bool, typer.Option()] = False,
# batches_of: Annotated[Optional[int], typer.Option()] = None,
# settings: Annotated[str, typer.Option()] = '',
# dialect: Annotated[str, typer.Option()] = 'duckdb',
# insert: Annotated[bool, typer.Option()] = False):
# # log.info(f'Reading repos from {repo_path}')
# contents = _read_path(repo_path)
# repos = Repos.model_validate_json(contents)
#
# if redirects:
# urls = asyncio.run(resolve_dataset_redirects(repos.root))
# else:
# urls = [r.dataset_url for r in repos.root]
#
# if dialect == 'duckdb':
# def make_url(url: str):
# return f"read_parquet('{url}')"
# elif dialect == 'clickhouse':
# def make_url(url: str):
# return f"url('{url}', 'Parquet')"
# else:
# raise ValueError(f"Unknown dialect: {dialect}")
#
# sql_queries = [
# f"SELECT * FROM {make_url(url)}"
# for url in urls
# ]
#
# if batches_of:
# batches = list(itertools.batched(sql_queries, batches_of))
# else:
# batches = [sql_queries]
#
# for batch in batches:
# query = 'INSERT INTO output\n' if insert else ''
# query += "\nUNION ALL ".join(batch)
# if settings:
# query += f'\nSETTINGS {settings}'
#
# print(query + ';\n')


async def resolve_dataset_redirects(
repositories: list[CodeRepository], concurrency: int = 10
) -> list[str]:
Expand Down
2 changes: 0 additions & 2 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ async def fill_buffer(
log.info(f"Failed to download {repo.dataset_url}")
continue

table = table.combine_chunks()

for idx, batch in enumerate(table.to_batches(max_chunksize=2_500_000)):
batch: RecordBatch
digest = hashlib.sha256()
Expand Down
15 changes: 3 additions & 12 deletions src/pypi_data/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class CodeRepository(pydantic.BaseModel):
name: str
url: HttpUrl
ssh_url: str
parquet_url: HttpUrl
index: RepositoryIndex | None = None

@pydantic.computed_field()
Expand All @@ -62,7 +61,6 @@ def fetch_all(cls, github: Github) -> list[Self]:
name=repo.name,
url=repo.html_url,
ssh_url=repo.ssh_url,
parquet_url=f"{repo.html_url}/releases/download/latest/dataset.parquet",
)
for repo in github.get_organization("pypi-data").get_repos()
if repo.name.startswith("pypi-mirror-")
Expand Down Expand Up @@ -127,16 +125,9 @@ async def download_dataset(self, client: httpx.AsyncClient) -> pa.Table | None:
log.info(
f"Loading parquet file with size {len(response.content) / 1024 / 1024:.1f} MB"
)
return pq.read_table(pa.py_buffer(memoryview(response.content)))

# async with client.stream("GET", str(self.dataset_url)) as response:
# if self.check_response(response, follow_redirects=True) is None:
# return None
#
# with output.open("wb") as f:
# async for buffer in response.aiter_bytes():
# f.write(buffer)
# return True
return pq.read_table(
pa.py_buffer(memoryview(response.content))
).combine_chunks()

def without_index(self) -> Self:
return self.model_copy(update={"index": None})

0 comments on commit 9efb6dc

Please sign in to comment.