Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file buffering #13

Merged
merged 2 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt
from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._buffered_text_reader import BufferedTextReader

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
"IPINFO_CREDENTIALS",
"BufferedTextReader",
"get_hash_salt",
"parse_raw_s3_log",
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
Expand Down
61 changes: 61 additions & 0 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import pathlib


class BufferedTextReader:
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9):
"""
Lazily read a text file into RAM using buffers of a specified size.

Parameters
----------
file_path : string or pathlib.Path
The path to the text file to be read.
maximum_ram_usage : int, default: 1 GB
The theoretical maximum amount of RAM to be used by the BufferedTextReader object.
"""
self.file_path = file_path
self.maximum_ram_usage = maximum_ram_usage

# The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage
# due to decoding and handling
self.buffer_size_in_bytes = int(maximum_ram_usage / 3)

self.total_file_size = pathlib.Path(file_path).stat().st_size
self.offset = 0

def __iter__(self):
return self

def __next__(self) -> list[str]:
"""Retrieve the next buffer from the file, or raise StopIteration if the file is exhausted."""
if self.offset >= self.total_file_size:
raise StopIteration

with open(file=self.file_path, mode="rb", buffering=0) as io:
io.seek(self.offset)
intermediate_bytes = io.read(self.buffer_size_in_bytes)
decoded_intermediate_buffer = intermediate_bytes.decode()
split_intermediate_buffer = decoded_intermediate_buffer.splitlines()

# Check if we are at the end of the file
if len(intermediate_bytes) < self.buffer_size_in_bytes:
self.offset = self.total_file_size
return split_intermediate_buffer

buffer = split_intermediate_buffer[:-1]
last_line = split_intermediate_buffer[-1]

if len(buffer) == 0 and last_line != "":
raise ValueError(
f"BufferedTextReader encountered a line at offset {self.offset} that exceeds the buffer "
"size! Try increasing the `buffer_size_in_bytes` to account for this line."
)

# The last line split by the intermediate buffer may or may not be incomplete
if decoded_intermediate_buffer.endswith("\n"):
# By chance, this iteration finished on a clean line break
self.offset += self.buffer_size_in_bytes
else:
self.offset += self.buffer_size_in_bytes - len(last_line.encode("utf-8"))

return buffer
17 changes: 10 additions & 7 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._buffered_text_reader import BufferedTextReader


def _get_reduced_log_lines(
Expand Down Expand Up @@ -50,14 +51,15 @@ def _get_reduced_log_lines(
# This dictionary is intended to be mutated throughout the process
ip_address_to_region = _load_ip_address_to_region_cache()

# Perform I/O read in batches to improve performance
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0)
resolved_tqdm_kwargs.update(tqdm_kwargs)

reduced_log_lines = list()
with open(file=raw_s3_log_file_path, mode="r") as io:
# Perform I/O read in one batch to improve performance
# TODO: for larger files, this loads entirely into RAM - need buffering
resolved_tqdm_kwargs = dict(desc="Parsing lines...", leave=False, mininterval=1.0)
resolved_tqdm_kwargs.update(tqdm_kwargs)
raw_lines = tqdm.tqdm(iterable=io.readlines(), **resolved_tqdm_kwargs)
for index, raw_line in enumerate(raw_lines):
per_buffer_index = 0
buffered_text_reader = BufferedTextReader(file_path=raw_s3_log_file_path)
for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs):
for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index):
_append_reduced_log_line(
raw_line=raw_line,
reduced_log_lines=reduced_log_lines,
Expand All @@ -68,6 +70,7 @@ def _get_reduced_log_lines(
index=index,
ip_hash_to_region=ip_address_to_region,
)
per_buffer_index += index

_save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region)

Expand Down
69 changes: 69 additions & 0 deletions tests/test_buffered_text_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pathlib
import sys

import pytest

import dandi_s3_log_parser


@pytest.fixture(scope="session")
def large_text_file_path(tmp_path_factory: pytest.TempPathFactory):
tmp_path = tmp_path_factory.mktemp("large_text_file")

# Generate a test file ~10 MB in total size
# Content does not matter, each line is ~100 bytes
test_file_path = tmp_path / "large_text_file.txt"
fill_string = "a" * 60 + "\n"
content = [fill_string for _ in range(10**5)]
with open(file=test_file_path, mode="w") as test_file:
test_file.writelines(content)

return test_file_path


@pytest.fixture(scope="session")
def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory):
"""For testing the ValueError case during iteration."""
tmp_path = tmp_path_factory.mktemp("single_line_text_file")

# Generate test file ~3 MB in total size, consisting of only a single line
test_file_path = tmp_path / "single_line_text_file.txt"
with open(file=test_file_path, mode="w") as test_file:
test_file.write("a" * 30**6)

return test_file_path


def test_buffered_text_reader(large_text_file_path: pathlib.Path):
maximum_ram_usage = 10**6 # 1 MB
buffered_text_reader = dandi_s3_log_parser.BufferedTextReader(
file_path=large_text_file_path, maximum_ram_usage=maximum_ram_usage
)

assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!"

for buffer_index, buffer in enumerate(buffered_text_reader):
assert isinstance(buffer, list), "BufferedTextReader object did not load a buffer as a list!"
assert (
sys.getsizeof(buffer) <= buffered_text_reader.buffer_size_in_bytes
), "BufferedTextReader object loaded a buffer exceeding the threshold!"

assert buffer_index == 18, "BufferedTextReader object did not load the correct number of buffers!"

with pytest.raises(StopIteration):
next(buffered_text_reader)


def test_value_error(single_line_text_file_path: pathlib.Path):
maximum_ram_usage = 10**6 # 1 MB
with pytest.raises(ValueError) as error_info:
buffered_text_reader = dandi_s3_log_parser.BufferedTextReader(
file_path=single_line_text_file_path, maximum_ram_usage=maximum_ram_usage
)
next(buffered_text_reader)

expected_message = (
"BufferedTextReader encountered a line at offset 0 that exceeds the buffer size! "
"Try increasing the `buffer_size_in_bytes` to account for this line."
)
assert str(error_info.value) == expected_message