Skip to content

Commit

Permalink
Add ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Dec 6, 2024
1 parent a4e9a22 commit 48a4bd2
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ pypi-data = "pypi_data.cli:app"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[dependency-groups]
dev = [
"ruff>=0.8.2",
]
55 changes: 36 additions & 19 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def fill_buffer(
client: httpx.AsyncClient,
repositories: Deque[CodeRepository],
directory: Path,
schema_merge: pa.Schema
schema_merge: pa.Schema,
) -> bool:
while repositories:
time_hashing_ns = 0
Expand All @@ -102,10 +102,13 @@ async def fill_buffer(
start_load_time = time.perf_counter_ns()

table = await asyncio.to_thread(
lambda: pq.read_table(pa.py_buffer(memoryview(dataset_bytes)))
.combine_chunks()
lambda: pq.read_table(
pa.py_buffer(memoryview(dataset_bytes))
).combine_chunks()
)
table_batches = table.cast(pa.unify_schemas([table.schema, schema_merge], promote_options="permissive")).to_batches(max_chunksize=2_500_000)
table_batches = table.cast(
pa.unify_schemas([table.schema, schema_merge], promote_options="permissive")
).to_batches(max_chunksize=2_500_000)
del dataset_bytes, table
time_loading_ns += time.perf_counter_ns() - start_load_time

Expand Down Expand Up @@ -181,7 +184,12 @@ async def combine_parquet(
while repositories:
if (
await fill_buffer(
buffer, max_buffer_size, client, repositories, directory, schema_merge
buffer,
max_buffer_size,
client,
repositories,
directory,
schema_merge,
)
is False
):
Expand All @@ -193,25 +201,34 @@ async def combine_parquet(
first_key, first_buffer = buffer.popleft()
keys.append(first_key)

with pyarrow.output_stream(
roll_up_path, compression=None, buffer_size=IO_BUFFER_SIZE
) as fd, pq.ParquetWriter(
fd,
compression="zstd",
compression_level=9,
write_statistics=True,
write_batch_size=1024 * 10,
data_page_size=1024 * 1024 * 5,
schema=pa.unify_schemas(
[first_buffer.schema, schema_merge], promote_options="permissive"
),
) as writer:
with (
pyarrow.output_stream(
roll_up_path, compression=None, buffer_size=IO_BUFFER_SIZE
) as fd,
pq.ParquetWriter(
fd,
compression="zstd",
compression_level=9,
write_statistics=True,
write_batch_size=1024 * 10,
data_page_size=1024 * 1024 * 5,
schema=pa.unify_schemas(
[first_buffer.schema, schema_merge],
promote_options="permissive",
),
) as writer,
):
append_buffer(fd, writer, first_buffer, roll_up_path, target_size)

while buffer or repositories:
if not buffer:
res = await fill_buffer(
buffer, max_buffer_size, client, repositories, directory, schema_merge
buffer,
max_buffer_size,
client,
repositories,
directory,
schema_merge,
)
if res is None:
continue
Expand Down
33 changes: 33 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 48a4bd2

Please sign in to comment.