Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve argument names #23

Merged
merged 4 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Outermost exposed imports; including global environment variables."""

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt
from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._s3_log_file_parser import parse_raw_s3_log
from ._buffered_text_reader import BufferedTextReader
from ._order_parsed_logs import order_parsed_logs
from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
Expand Down
13 changes: 7 additions & 6 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@


class BufferedTextReader:
def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: int = 10**9):
def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_bytes: int = 10**9):
"""
Lazily read a text file into RAM using buffers of a specified size.

Parameters
----------
file_path : string or pathlib.Path
The path to the text file to be read.
maximum_ram_usage_in_bytes : int, default: 1 GB
The theoretical maximum amount of RAM (in bytes) to be used by the BufferedTextReader object.
maximum_buffer_size_in_bytes : int, default: 1 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text file.
"""
self.file_path = file_path
self.maximum_ram_usage_in_bytes = maximum_ram_usage_in_bytes
self.maximum_buffer_size_in_bytes = maximum_buffer_size_in_bytes

# The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage
# due to decoding and handling
self.buffer_size_in_bytes = int(maximum_ram_usage_in_bytes / 3)
self.buffer_size_in_bytes = int(maximum_buffer_size_in_bytes / 3)

self.total_file_size = pathlib.Path(file_path).stat().st_size
self.offset = 0
Expand Down Expand Up @@ -48,7 +49,7 @@ def __next__(self) -> list[str]:
if len(buffer) == 0 and last_line != "":
raise ValueError(
f"BufferedTextReader encountered a line at offset {self.offset} that exceeds the buffer "
"size! Try increasing the `buffer_size_in_bytes` to account for this line."
"size! Try increasing the `maximum_buffer_size_in_bytes` to account for this line."
)

# The last line split by the intermediate buffer may or may not be incomplete
Expand Down
16 changes: 6 additions & 10 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,19 @@
default=None,
)
@click.option(
"--number_of_jobs",
help="The number of jobs to use for parallel processing.",
"--maximum_number_of_workers",
help="The maximum number of workers to distribute tasks across.",
required=False,
type=int,
type=click.IntRange(1, os.cpu_count()),
default=1,
)
def parse_all_dandi_raw_s3_logs_cli(
base_raw_s3_log_folder_path: str,
parsed_s3_log_folder_path: str,
mode: Literal["w", "a"] = "a",
excluded_ips: str | None = None,
number_of_jobs: int = 1,
maximum_number_of_workers: int = 1,
) -> None:
number_of_jobs = NUMBER_OF_CPU + number_of_jobs + 1 if number_of_jobs < 0 else number_of_jobs
assert number_of_jobs > 0, "The number of jobs must be greater than 0."
assert number_of_jobs <= NUMBER_OF_CPU, "The number of jobs must be less than or equal to the number of CPUs."

split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else []
handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None
for excluded_ip in split_excluded_ips:
Expand All @@ -76,12 +72,12 @@ def parse_all_dandi_raw_s3_logs_cli(
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
mode=mode,
excluded_ips=handled_excluded_ips,
number_of_jobs=number_of_jobs,
maximum_number_of_workers=maximum_number_of_workers,
)


# TODO
@click.command(name="parse_dandi_raw_s3_logs")
@click.command(name="parse_dandi_raw_s3_log")
def parse_dandi_raw_s3_log_cli() -> None:
parse_dandi_raw_s3_log()

Expand Down
Loading