Skip to content

Commit

Permalink
Add file buffering (#13)
Browse files Browse the repository at this point in the history
* add text file buffering with tests

* integrate with main runner
  • Loading branch information
CodyCBakerPhD authored Jul 20, 2024
1 parent 9c5ee6a commit f39a0e5
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt
from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._buffered_text_reader import BufferedTextReader

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
"IPINFO_CREDENTIALS",
"BufferedTextReader",
"get_hash_salt",
"parse_raw_s3_log",
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
Expand Down
61 changes: 61 additions & 0 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import pathlib


class BufferedTextReader:
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9):
"""
Lazily read a text file into RAM using buffers of a specified size.
Parameters
----------
file_path : string or pathlib.Path
The path to the text file to be read.
maximum_ram_usage : int, default: 1 GB
The theoretical maximum amount of RAM to be used by the BufferedTextReader object.
"""
self.file_path = file_path
self.maximum_ram_usage = maximum_ram_usage

# The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage
# due to decoding and handling
self.buffer_size_in_bytes = int(maximum_ram_usage / 3)

self.total_file_size = pathlib.Path(file_path).stat().st_size
self.offset = 0

def __iter__(self):
return self

def __next__(self) -> list[str]:
"""Retrieve the next buffer from the file, or raise StopIteration if the file is exhausted."""
if self.offset >= self.total_file_size:
raise StopIteration

with open(file=self.file_path, mode="rb", buffering=0) as io:
io.seek(self.offset)
intermediate_bytes = io.read(self.buffer_size_in_bytes)
decoded_intermediate_buffer = intermediate_bytes.decode()
split_intermediate_buffer = decoded_intermediate_buffer.splitlines()

# Check if we are at the end of the file
if len(intermediate_bytes) < self.buffer_size_in_bytes:
self.offset = self.total_file_size
return split_intermediate_buffer

buffer = split_intermediate_buffer[:-1]
last_line = split_intermediate_buffer[-1]

if len(buffer) == 0 and last_line != "":
raise ValueError(
f"BufferedTextReader encountered a line at offset {self.offset} that exceeds the buffer "
"size! Try increasing the `buffer_size_in_bytes` to account for this line."
)

# The last line split by the intermediate buffer may or may not be incomplete
if decoded_intermediate_buffer.endswith("\n"):
# By chance, this iteration finished on a clean line break
self.offset += self.buffer_size_in_bytes
else:
self.offset += self.buffer_size_in_bytes - len(last_line.encode("utf-8"))

return buffer
17 changes: 10 additions & 7 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._buffered_text_reader import BufferedTextReader


def _get_reduced_log_lines(
Expand Down Expand Up @@ -50,14 +51,15 @@ def _get_reduced_log_lines(
# This dictionary is intended to be mutated throughout the process
ip_address_to_region = _load_ip_address_to_region_cache()

# Perform I/O read in batches to improve performance
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0)
resolved_tqdm_kwargs.update(tqdm_kwargs)

reduced_log_lines = list()
with open(file=raw_s3_log_file_path, mode="r") as io:
# Perform I/O read in one batch to improve performance
# TODO: for larger files, this loads entirely into RAM - need buffering
resolved_tqdm_kwargs = dict(desc="Parsing lines...", leave=False, mininterval=1.0)
resolved_tqdm_kwargs.update(tqdm_kwargs)
raw_lines = tqdm.tqdm(iterable=io.readlines(), **resolved_tqdm_kwargs)
for index, raw_line in enumerate(raw_lines):
per_buffer_index = 0
buffered_text_reader = BufferedTextReader(file_path=raw_s3_log_file_path)
for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs):
for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index):
_append_reduced_log_line(
raw_line=raw_line,
reduced_log_lines=reduced_log_lines,
Expand All @@ -68,6 +70,7 @@ def _get_reduced_log_lines(
index=index,
ip_hash_to_region=ip_address_to_region,
)
per_buffer_index += index

_save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region)

Expand Down
69 changes: 69 additions & 0 deletions tests/test_buffered_text_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pathlib
import sys

import pytest

import dandi_s3_log_parser


@pytest.fixture(scope="session")
def large_text_file_path(tmp_path_factory: pytest.TempPathFactory):
tmp_path = tmp_path_factory.mktemp("large_text_file")

# Generate a test file ~10 MB in total size
# Content does not matter, each line is ~100 bytes
test_file_path = tmp_path / "large_text_file.txt"
fill_string = "a" * 60 + "\n"
content = [fill_string for _ in range(10**5)]
with open(file=test_file_path, mode="w") as test_file:
test_file.writelines(content)

return test_file_path


@pytest.fixture(scope="session")
def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory):
"""For testing the ValueError case during iteration."""
tmp_path = tmp_path_factory.mktemp("single_line_text_file")

# Generate test file ~3 MB in total size, consisting of only a single line
test_file_path = tmp_path / "single_line_text_file.txt"
with open(file=test_file_path, mode="w") as test_file:
test_file.write("a" * 30**6)

return test_file_path


def test_buffered_text_reader(large_text_file_path: pathlib.Path):
maximum_ram_usage = 10**6 # 1 MB
buffered_text_reader = dandi_s3_log_parser.BufferedTextReader(
file_path=large_text_file_path, maximum_ram_usage=maximum_ram_usage
)

assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!"

for buffer_index, buffer in enumerate(buffered_text_reader):
assert isinstance(buffer, list), "BufferedTextReader object did not load a buffer as a list!"
assert (
sys.getsizeof(buffer) <= buffered_text_reader.buffer_size_in_bytes
), "BufferedTextReader object loaded a buffer exceeding the threshold!"

assert buffer_index == 18, "BufferedTextReader object did not load the correct number of buffers!"

with pytest.raises(StopIteration):
next(buffered_text_reader)


def test_value_error(single_line_text_file_path: pathlib.Path):
maximum_ram_usage = 10**6 # 1 MB
with pytest.raises(ValueError) as error_info:
buffered_text_reader = dandi_s3_log_parser.BufferedTextReader(
file_path=single_line_text_file_path, maximum_ram_usage=maximum_ram_usage
)
next(buffered_text_reader)

expected_message = (
"BufferedTextReader encountered a line at offset 0 that exceeds the buffer size! "
"Try increasing the `buffer_size_in_bytes` to account for this line."
)
assert str(error_info.value) == expected_message

0 comments on commit f39a0e5

Please sign in to comment.