Skip to content

Commit

Permalink
zstd compression?
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Oct 19, 2024
1 parent f0655fe commit 190e4fb
Showing 1 changed file with 1 addition and 22 deletions.
23 changes: 1 addition & 22 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def hash_parquet_keys(keys: list[tuple[int, range]]) -> str:

async def combine_parquet(repositories: list[CodeRepository], directory: Path):
directory.mkdir(exist_ok=True)
# combined_file = directory / "combined.parquet"
# temp_combined = directory / "temporary.parquet"
repo_file = directory / f"repo.parquet"

roll_up_count = 0
Expand Down Expand Up @@ -93,29 +91,10 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path):

roll_up_count += 1

# for repo in repositories:
# log.info("Downloading dataset", repo=repo.name)
# await repo.download_dataset(client, repo_file)
#
# log.info("Merging dataset", repo=repo.name)
# await asyncio.to_thread(append_parquet_file, temp_combined, [combined_file, repo_file])
# log.info(f"Merged size: {temp_combined.stat().st_size / 1024 / 1024:.1f} MB")
#
# if temp_combined.stat().st_size < TARGET_SIZE:
# temp_combined.rename(combined_file)
# repo_file.unlink()
# else:
# # Too big! Roll over
# roll_up_count = finish_batch(combined_file, roll_up_count, directory, repo_file, temp_combined)
#
# if repo_file.exists():
# finish_batch(combined_file, roll_up_count, directory, repo_file, temp_combined)


def append_parquet_file(output: Path, paths: list[Path]) -> Path:
table = pa.concat_tables(
(pq.read_table(str(p), memory_map=True) for p in paths if p.exists()),
promote_options="none"
)
pq.write_table(table, str(output), compression="snappy")
pq.write_table(table, str(output), compression="zstd", compression_level=7)
return output

0 comments on commit 190e4fb

Please sign in to comment.