Skip to content

Commit

Permalink
Fix table schema
Browse files Browse the repository at this point in the history
  • Loading branch information
orf committed Dec 6, 2024
1 parent dba97d3 commit a4e9a22
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/pypi_data/combine_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async def fill_buffer(
client: httpx.AsyncClient,
repositories: Deque[CodeRepository],
directory: Path,
schema_merge: pa.Schema
) -> bool:
while repositories:
time_hashing_ns = 0
Expand All @@ -100,12 +101,12 @@ async def fill_buffer(

start_load_time = time.perf_counter_ns()

table_batches = await asyncio.to_thread(
table = await asyncio.to_thread(
lambda: pq.read_table(pa.py_buffer(memoryview(dataset_bytes)))
.combine_chunks()
.to_batches(max_chunksize=2_500_000)
)
del dataset_bytes
table_batches = table.cast(pa.unify_schemas([table.schema, schema_merge], promote_options="permissive")).to_batches(max_chunksize=2_500_000)
del dataset_bytes, table
time_loading_ns += time.perf_counter_ns() - start_load_time

iterator = iter(enumerate(table_batches))
Expand Down Expand Up @@ -174,13 +175,13 @@ async def combine_parquet(
f"Configured buffer size: {ByteSize(max_buffer_size).human_readable(decimal=True)}"
)

schema_hack = pa.schema([("repository", pa.int64())])
schema_merge = pa.schema([("repository", pa.int64())])

async with httpx.AsyncClient(follow_redirects=True) as client:
while repositories:
if (
await fill_buffer(
buffer, max_buffer_size, client, repositories, directory
buffer, max_buffer_size, client, repositories, directory, schema_merge
)
is False
):
Expand All @@ -202,15 +203,15 @@ async def combine_parquet(
write_batch_size=1024 * 10,
data_page_size=1024 * 1024 * 5,
schema=pa.unify_schemas(
[first_buffer.schema, schema_hack], promote_options="permissive"
[first_buffer.schema, schema_merge], promote_options="permissive"
),
) as writer:
append_buffer(fd, writer, first_buffer, roll_up_path, target_size)

while buffer or repositories:
if not buffer:
res = await fill_buffer(
buffer, max_buffer_size, client, repositories, directory
buffer, max_buffer_size, client, repositories, directory, schema_merge
)
if res is None:
continue
Expand Down

0 comments on commit a4e9a22

Please sign in to comment.