From 382a6791c894b32ddbbeaeb3379662b4f51bf34b Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Sat, 20 Jul 2024 14:46:53 -0400 Subject: [PATCH 1/2] add text file buffering with tests --- src/dandi_s3_log_parser/__init__.py | 3 + .../_buffered_text_reader.py | 61 ++++++++++++++++ tests/test_buffered_text_reader.py | 69 +++++++++++++++++++ 3 files changed, 133 insertions(+) create mode 100644 src/dandi_s3_log_parser/_buffered_text_reader.py create mode 100644 tests/test_buffered_text_reader.py diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index 6eff08b..279cb4c 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -2,10 +2,13 @@ from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs +from ._buffered_text_reader import BufferedTextReader __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", "IPINFO_CREDENTIALS", + "BufferedTextReader", + "get_hash_salt", "parse_raw_s3_log", "parse_dandi_raw_s3_log", "parse_all_dandi_raw_s3_logs", diff --git a/src/dandi_s3_log_parser/_buffered_text_reader.py b/src/dandi_s3_log_parser/_buffered_text_reader.py new file mode 100644 index 0000000..6ebe0bf --- /dev/null +++ b/src/dandi_s3_log_parser/_buffered_text_reader.py @@ -0,0 +1,61 @@ +import pathlib + + +class BufferedTextReader: + def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9): + """ + Lazily read a text file into RAM using buffers of a specified size. + + Parameters + ---------- + file_path : string or pathlib.Path + The path to the text file to be read. + maximum_ram_usage : int, default: 1 GB + The theoretical maximum amount of RAM to be used by the BufferedTextReader object. + """ + self.file_path = file_path + self.maximum_ram_usage = maximum_ram_usage + + # The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage + # due to decoding and handling + self.buffer_size_in_bytes = int(maximum_ram_usage / 3) + + self.total_file_size = pathlib.Path(file_path).stat().st_size + self.offset = 0 + + def __iter__(self): + return self + + def __next__(self) -> list[str]: + """Retrieve the next buffer from the file, or raise StopIteration if the file is exhausted.""" + if self.offset >= self.total_file_size: + raise StopIteration + + with open(file=self.file_path, mode="rb", buffering=0) as io: + io.seek(self.offset) + intermediate_bytes = io.read(self.buffer_size_in_bytes) + decoded_intermediate_buffer = intermediate_bytes.decode() + split_intermediate_buffer = decoded_intermediate_buffer.splitlines() + + # Check if we are at the end of the file + if len(intermediate_bytes) < self.buffer_size_in_bytes: + self.offset = self.total_file_size + return split_intermediate_buffer + + buffer = split_intermediate_buffer[:-1] + last_line = split_intermediate_buffer[-1] + + if len(buffer) == 0 and last_line != "": + raise ValueError( + f"BufferedTextReader encountered a line at offset {self.offset} that exceeds the buffer " + "size! Try increasing the `buffer_size_in_bytes` to account for this line." + ) + + # The last line split by the intermediate buffer may or may not be incomplete + if decoded_intermediate_buffer.endswith("\n"): + # By chance, this iteration finished on a clean line break + self.offset += self.buffer_size_in_bytes + else: + self.offset += self.buffer_size_in_bytes - len(last_line.encode("utf-8")) + + return buffer diff --git a/tests/test_buffered_text_reader.py b/tests/test_buffered_text_reader.py new file mode 100644 index 0000000..6ffaa95 --- /dev/null +++ b/tests/test_buffered_text_reader.py @@ -0,0 +1,69 @@ +import pathlib +import sys + +import pytest + +import dandi_s3_log_parser + + +@pytest.fixture(scope="session") +def large_text_file_path(tmp_path_factory: pytest.TempPathFactory): + tmp_path = tmp_path_factory.mktemp("large_text_file") + + # Generate a test file ~10 MB in total size + # Content does not matter, each line is ~100 bytes + test_file_path = tmp_path / "large_text_file.txt" + fill_string = "a" * 60 + "\n" + content = [fill_string for _ in range(10**5)] + with open(file=test_file_path, mode="w") as test_file: + test_file.writelines(content) + + return test_file_path + + +@pytest.fixture(scope="session") +def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory): + """For testing the ValueError case during iteration.""" + tmp_path = tmp_path_factory.mktemp("single_line_text_file") + + # Generate test file ~3 MB in total size, consisting of only a single line + test_file_path = tmp_path / "single_line_text_file.txt" + with open(file=test_file_path, mode="w") as test_file: + test_file.write("a" * 30**6) + + return test_file_path + + +def test_buffered_text_reader(large_text_file_path: pathlib.Path): + maximum_ram_usage = 10**6 # 1 MB + buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( + file_path=large_text_file_path, maximum_ram_usage=maximum_ram_usage + ) + + assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!" + + for buffer_index, buffer in enumerate(buffered_text_reader): + assert isinstance(buffer, list), "BufferedTextReader object did not load a buffer as a list!" + assert ( + sys.getsizeof(buffer) <= buffered_text_reader.buffer_size_in_bytes + ), "BufferedTextReader object loaded a buffer exceeding the threshold!" + + assert buffer_index == 18, "BufferedTextReader object did not load the correct number of buffers!" + + with pytest.raises(StopIteration): + next(buffered_text_reader) + + +def test_value_error(single_line_text_file_path: pathlib.Path): + maximum_ram_usage = 10**6 # 1 MB + with pytest.raises(ValueError) as error_info: + buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( + file_path=single_line_text_file_path, maximum_ram_usage=maximum_ram_usage + ) + next(buffered_text_reader) + + expected_message = ( + "BufferedTextReader encountered a line at offset 0 that exceeds the buffer size! " + "Try increasing the `buffer_size_in_bytes` to account for this line." + ) + assert str(error_info.value) == expected_message From a5c478ebc30bf3abb3b233287c6f3d94fcfff7eb Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Sat, 20 Jul 2024 14:52:34 -0400 Subject: [PATCH 2/2] integrate with main runner --- src/dandi_s3_log_parser/_s3_log_file_parser.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 73950ae..9f75438 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -16,6 +16,7 @@ ) from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH +from ._buffered_text_reader import BufferedTextReader def _get_reduced_log_lines( @@ -50,14 +51,15 @@ def _get_reduced_log_lines( # This dictionary is intended to be mutated throughout the process ip_address_to_region = _load_ip_address_to_region_cache() + # Perform I/O read in batches to improve performance + resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0) + resolved_tqdm_kwargs.update(tqdm_kwargs) + reduced_log_lines = list() - with open(file=raw_s3_log_file_path, mode="r") as io: - # Perform I/O read in one batch to improve performance - # TODO: for larger files, this loads entirely into RAM - need buffering - resolved_tqdm_kwargs = dict(desc="Parsing lines...", leave=False, mininterval=1.0) - resolved_tqdm_kwargs.update(tqdm_kwargs) - raw_lines = tqdm.tqdm(iterable=io.readlines(), **resolved_tqdm_kwargs) - for index, raw_line in enumerate(raw_lines): + per_buffer_index = 0 + buffered_text_reader = BufferedTextReader(file_path=raw_s3_log_file_path) + for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): + for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index): _append_reduced_log_line( raw_line=raw_line, reduced_log_lines=reduced_log_lines, @@ -68,6 +70,7 @@ def _get_reduced_log_lines( index=index, ip_hash_to_region=ip_address_to_region, ) + per_buffer_index += index _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region)