From c13a0161fc85dbc31524bc9071b1b8e71c4536fd Mon Sep 17 00:00:00 2001 From: Tom Forbes Date: Sun, 20 Oct 2024 12:52:56 +0100 Subject: [PATCH] Make buffer size dynamic --- pyproject.toml | 1 + src/pypi_data/combine_parquet.py | 28 ++++++++++++++++++++-------- uv.lock | 17 +++++++++++++++++ 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index adf3e85e..9b3c1d96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ requires-python = "~=3.12.0" dependencies = [ "hishel[sqlite]>=0.0.33", "httpx>=0.27.2", + "psutil>=6.1.0", "pyarrow>=17.0.0", "pydantic-core>=2.23.4", "pydantic>=2.9.2", diff --git a/src/pypi_data/combine_parquet.py b/src/pypi_data/combine_parquet.py index 326d61bc..31a01c06 100644 --- a/src/pypi_data/combine_parquet.py +++ b/src/pypi_data/combine_parquet.py @@ -4,6 +4,7 @@ from typing import Deque import httpx +import psutil import pyarrow import pyarrow.parquet as pq import structlog @@ -14,7 +15,6 @@ log = structlog.get_logger() TARGET_SIZE = 1024 * 1024 * 1024 * 1.8 # 1.8 GB -FILL_BUFFER_MEM_SIZE = 1024 * 1024 * 1024 * 5 # 5 GB IO_BUFFER_SIZE = 1024 * 1024 * 50 # 50 MB @@ -43,12 +43,9 @@ def buffer_mem_size(buffer: Deque[tuple[tuple[int, str], RecordBatch]]) -> int: return sum(batch.nbytes for (_, _), batch in buffer) -def buffer_at_capacity(size: int) -> bool: - return size >= FILL_BUFFER_MEM_SIZE - - async def fill_buffer( buffer: Deque[tuple[tuple[int, str], RecordBatch]], + max_buffer_size: int, client: httpx.AsyncClient, repositories: Deque[CodeRepository], path: Path, @@ -56,7 +53,7 @@ async def fill_buffer( while repositories: buffer_size = buffer_mem_size(buffer) log.info(f"Buffer size: {buffer_size / 1024 / 1024:.1f} MB") - if buffer_at_capacity(buffer_size): + if buffer_size >= max_buffer_size: log.info(f"Buffer filled with {len(buffer)} entries") break @@ -92,9 +89,22 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path): buffer: Deque[tuple[tuple[int, str], RecordBatch]] = deque() repositories: Deque[CodeRepository] = deque(repositories) + total_memory = psutil.virtual_memory().total + max_buffer_size = min( + int(total_memory * 0.75), # 75% of total memory + 1024 * 1024 * 1024 * 10, # 10 GB + ) + log.info(f"Total system memory: {total_memory / 1024 / 1024} MB") + log.info(f"Configured buffer size: {max_buffer_size / 1024 / 1024} MB") + async with httpx.AsyncClient(follow_redirects=True) as client: while repositories: - if await fill_buffer(buffer, client, repositories, repo_file) is False: + if ( + await fill_buffer( + buffer, max_buffer_size, client, repositories, repo_file + ) + is False + ): continue roll_up_path = directory / f"merged-{roll_up_count}.parquet" @@ -117,7 +127,9 @@ async def combine_parquet(repositories: list[CodeRepository], directory: Path): while buffer or repositories: if not buffer: - res = await fill_buffer(buffer, client, repositories, repo_file) + res = await fill_buffer( + buffer, max_buffer_size, client, repositories, repo_file + ) if res is None: continue diff --git a/uv.lock b/uv.lock index 876e91af..21ed5e84 100644 --- a/uv.lock +++ b/uv.lock @@ -235,6 +235,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4c/79/73735a6a5dad6059c085f240a4e74c9270feccd2bc66e4d31b5ca01d329c/numpy-2.1.2-cp312-cp312-win_amd64.whl", hash = "sha256:456e3b11cb79ac9946c822a56346ec80275eaf2950314b249b512896c0d2505e", size = 12568254 }, ] +[[package]] +name = "psutil" +version = "6.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/26/10/2a30b13c61e7cf937f4adf90710776b7918ed0a9c434e2c38224732af310/psutil-6.1.0.tar.gz", hash = "sha256:353815f59a7f64cdaca1c0307ee13558a0512f6db064e92fe833784f08539c7a", size = 508565 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/01/9e/8be43078a171381953cfee33c07c0d628594b5dbfc5157847b85022c2c1b/psutil-6.1.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:6e2dcd475ce8b80522e51d923d10c7871e45f20918e027ab682f94f1c6351688", size = 247762 }, + { url = "https://files.pythonhosted.org/packages/1d/cb/313e80644ea407f04f6602a9e23096540d9dc1878755f3952ea8d3d104be/psutil-6.1.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:0895b8414afafc526712c498bd9de2b063deaac4021a3b3c34566283464aff8e", size = 248777 }, + { url = "https://files.pythonhosted.org/packages/65/8e/bcbe2025c587b5d703369b6a75b65d41d1367553da6e3f788aff91eaf5bd/psutil-6.1.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9dcbfce5d89f1d1f2546a2090f4fcf87c7f669d1d90aacb7d7582addece9fb38", size = 284259 }, + { url = "https://files.pythonhosted.org/packages/58/4d/8245e6f76a93c98aab285a43ea71ff1b171bcd90c9d238bf81f7021fb233/psutil-6.1.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:498c6979f9c6637ebc3a73b3f87f9eb1ec24e1ce53a7c5173b8508981614a90b", size = 287255 }, + { url = "https://files.pythonhosted.org/packages/27/c2/d034856ac47e3b3cdfa9720d0e113902e615f4190d5d1bdb8df4b2015fb2/psutil-6.1.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d905186d647b16755a800e7263d43df08b790d709d575105d419f8b6ef65423a", size = 288804 }, + { url = "https://files.pythonhosted.org/packages/ea/55/5389ed243c878725feffc0d6a3bc5ef6764312b6fc7c081faaa2cfa7ef37/psutil-6.1.0-cp37-abi3-win32.whl", hash = "sha256:1ad45a1f5d0b608253b11508f80940985d1d0c8f6111b5cb637533a0e6ddc13e", size = 250386 }, + { url = "https://files.pythonhosted.org/packages/11/91/87fa6f060e649b1e1a7b19a4f5869709fbf750b7c8c262ee776ec32f3028/psutil-6.1.0-cp37-abi3-win_amd64.whl", hash = "sha256:a8fb3752b491d246034fa4d279ff076501588ce8cbcdbb62c32fd7a377d996be", size = 254228 }, +] + [[package]] name = "pyarrow" version = "17.0.0" @@ -357,6 +372,7 @@ source = { editable = "." } dependencies = [ { name = "hishel", extra = ["sqlite"] }, { name = "httpx" }, + { name = "psutil" }, { name = "pyarrow" }, { name = "pydantic" }, { name = "pydantic-core" }, @@ -372,6 +388,7 @@ dependencies = [ requires-dist = [ { name = "hishel", extras = ["sqlite"], specifier = ">=0.0.33" }, { name = "httpx", specifier = ">=0.27.2" }, + { name = "psutil", specifier = ">=6.1.0" }, { name = "pyarrow", specifier = ">=17.0.0" }, { name = "pydantic", specifier = ">=2.9.2" }, { name = "pydantic-core", specifier = ">=2.23.4" },