diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index 24e35a76..29d40ec9 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -46,8 +46,6 @@ def hash_parquet_keys(keys: list[tuple[int, range]]) -> str: async def combine_parquet(repositories: list[CodeRepository], directory: Path): directory.mkdir(exist_ok=True) - # combined_file = directory / "combined.parquet" - # temp_combined = directory / "temporary.parquet" repo_file = directory / f"repo.parquet" roll_up_count = 0 @@ -93,29 +91,10 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path): roll_up_count += 1 - # for repo in repositories: - # log.info("Downloading dataset", repo=repo.name) - # await repo.download_dataset(client, repo_file) - # - # log.info("Merging dataset", repo=repo.name) - # await asyncio.to_thread(append_parquet_file, temp_combined, [combined_file, repo_file]) - # log.info(f"Merged size: {temp_combined.stat().st_size / 1024 / 1024:.1f} MB") - # - # if temp_combined.stat().st_size < TARGET_SIZE: - # temp_combined.rename(combined_file) - # repo_file.unlink() - # else: - # # Too big! Roll over - # roll_up_count = finish_batch(combined_file, roll_up_count, directory, repo_file, temp_combined) - # - # if repo_file.exists(): - # finish_batch(combined_file, roll_up_count, directory, repo_file, temp_combined) - - def append_parquet_file(output: Path, paths: list[Path]) -> Path: table = pa.concat_tables( (pq.read_table(str(p), memory_map=True) for p in paths if p.exists()), promote_options="none" ) - pq.write_table(table, str(output), compression="snappy") + pq.write_table(table, str(output), compression="zstd", compression_level=7) return output