Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory overhead per worker #35

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 11 additions & 35 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from pydantic import DirectoryPath, Field, FilePath, validate_call

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._ip_utils import (
_get_latest_github_ip_ranges,
)
from ._s3_log_file_parser import parse_raw_s3_log


Expand All @@ -31,7 +28,6 @@ def parse_all_dandi_raw_s3_logs(
parsed_s3_log_folder_path: DirectoryPath,
excluded_log_files: list[FilePath] | None = None,
excluded_ips: collections.defaultdict[str, bool] | None = None,
exclude_github_ips: bool = True,
maximum_number_of_workers: int = Field(ge=1, default=1),
maximum_buffer_size_in_bytes: int = 4 * 10**9,
) -> None:
Expand All @@ -57,8 +53,6 @@ def parse_all_dandi_raw_s3_logs(
A list of log file paths to exclude from parsing.
excluded_ips : collections.defaultdict of strings to booleans, optional
A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing.
exclude_github_ips : bool, default: True
Include all GitHub action IP addresses in the `excluded_ips`.
maximum_number_of_workers : int, default: 1
The maximum number of workers to distribute tasks across.
maximum_buffer_size_in_bytes : int, default: 4 GB
Expand All @@ -74,13 +68,7 @@ def parse_all_dandi_raw_s3_logs(
excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files}
excluded_ips = excluded_ips or collections.defaultdict(bool)

if exclude_github_ips:
for github_ip in _get_latest_github_ip_ranges():
excluded_ips[github_ip] = True

def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
asset_id_handler = _get_default_dandi_asset_id_handler()

# The .rglob is not naturally sorted; shuffle for more uniform progress updates
daily_raw_s3_log_file_paths = set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files
Expand All @@ -98,7 +86,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
mode="a",
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=1, leave=False),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
Expand Down Expand Up @@ -203,10 +190,7 @@ def _multi_job_parse_dandi_raw_s3_log(
try:
error_message = ""

def asset_id_handler(*, raw_asset_id: str) -> str:
"""Apparently callables, even simple built-in ones, cannot be pickled."""
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
asset_id_handler = _get_default_dandi_asset_id_handler()

job_index = os.getpid() % maximum_number_of_workers
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
Expand All @@ -228,7 +212,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
parsed_s3_log_folder_path=per_job_temporary_folder_path,
mode="a",
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=job_index + 1, leave=False),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
Expand All @@ -245,7 +228,6 @@ def parse_dandi_raw_s3_log(
parsed_s3_log_folder_path: str | pathlib.Path,
mode: Literal["w", "a"] = "a",
excluded_ips: collections.defaultdict[str, bool] | None = None,
exclude_github_ips: bool = True,
asset_id_handler: Callable | None = None,
tqdm_kwargs: dict | None = None,
maximum_buffer_size_in_bytes: int = 4 * 10**9,
Expand All @@ -272,8 +254,6 @@ def parse_dandi_raw_s3_log(
over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration.
excluded_ips : collections.defaultdict of strings to booleans, optional
A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing.
exclude_github_ips : bool, default: True
Include all GitHub action IP addresses in the `excluded_ips`.
asset_id_handler : callable, optional
If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to
translate into nested directory paths) then define a function of the following form:
Expand All @@ -290,24 +270,12 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
"""
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
asset_id_handler = asset_id_handler or _get_default_dandi_asset_id_handler()
tqdm_kwargs = tqdm_kwargs or dict()

bucket = "dandiarchive"
request_type = "GET"

# Form a lookup for IP addresses to exclude; much faster than asking 'if in' a list on each iteration
# Exclude GitHub actions, which are responsible for running health checks on archive which bloat the logs
excluded_ips = excluded_ips or collections.defaultdict(bool)
if exclude_github_ips:
for github_ip in _get_latest_github_ip_ranges():
excluded_ips[github_ip] = True

if asset_id_handler is None:

def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

parse_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
Expand All @@ -319,3 +287,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
tqdm_kwargs=tqdm_kwargs,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)


def _get_default_dandi_asset_id_handler() -> Callable:
def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

return asset_id_handler
88 changes: 41 additions & 47 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import tqdm

from ._buffered_text_reader import BufferedTextReader
from ._s3_log_line_parser import _append_reduced_log_line, _ReducedLogLine
from ._s3_log_line_parser import _append_reduced_log_line


def parse_raw_s3_log(
Expand Down Expand Up @@ -67,65 +67,57 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
parsed_s3_log_folder_path.mkdir(exist_ok=True)
bucket = bucket or ""
excluded_ips = excluded_ips or collections.defaultdict(bool)
asset_id_handler = asset_id_handler or (lambda asset_id: asset_id)
tqdm_kwargs = tqdm_kwargs or dict()

reduced_logs = _get_reduced_log_lines(
assert raw_s3_log_file_path.suffix == ".log", f"`{raw_s3_log_file_path=}` should end in '.log'!"

reduced_and_binned_logs = _get_reduced_and_binned_log_lines(
raw_s3_log_file_path=raw_s3_log_file_path,
asset_id_handler=asset_id_handler,
bucket=bucket,
request_type=request_type,
excluded_ips=excluded_ips,
tqdm_kwargs=tqdm_kwargs,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)

reduced_logs_binned_by_unparsed_asset = dict()
for reduced_log in reduced_logs:
raw_asset_id = reduced_log.asset_id
reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get(
raw_asset_id,
collections.defaultdict(list),
)

reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp)
reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent)
reduced_logs_binned_by_unparsed_asset[raw_asset_id]["ip_address"].append(reduced_log.ip_address)
reduced_logs_binned_by_unparsed_asset[raw_asset_id]["line_index"].append(reduced_log.line_index)

if asset_id_handler is not None:
reduced_logs_binned_by_asset = dict()
for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items():
parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id)
for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items():
parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{handled_asset_id}.tsv"

reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset
else:
reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset

for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items():
parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv"

data_frame = pandas.DataFrame(data=reduced_logs_per_asset)
data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id)

header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True
data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False)


def _get_reduced_log_lines(
def _get_reduced_and_binned_log_lines(
*,
raw_s3_log_file_path: pathlib.Path,
bucket: str | None,
asset_id_handler: Callable,
bucket: str,
request_type: Literal["GET", "PUT"],
excluded_ips: collections.defaultdict[str, bool],
tqdm_kwargs: dict | None = None,
maximum_buffer_size_in_bytes: int = 4 * 10**9,
) -> list[_ReducedLogLine]:
tqdm_kwargs: dict,
maximum_buffer_size_in_bytes: int,
) -> collections.defaultdict[str, dict[str, list[str | int]]]:
"""
Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects.
Reduce the full S3 log file to minimal content and bin by asset ID.

Parameters
----------
raw_s3_log_file_path : str or pathlib.Path
Path to the raw S3 log file.
asset_id_handler : callable, optional
If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to
translate into nested directory paths) then define a function of the following form:

# For example
def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
bucket : str
Only parse and return lines that match this bucket.
request_type : str
Expand All @@ -137,42 +129,44 @@ def _get_reduced_log_lines(
maximum_buffer_size_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text file.
"""
assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!"

# Collapse bucket to empty string instead of asking if it is None on each iteration
bucket = "" if bucket is None else bucket
Returns
-------
reduced_and_binned_logs : collections.defaultdict
A map of all reduced log line content binned by handled asset ID.
"""
tqdm_kwargs = tqdm_kwargs or dict()

# Perform I/O read in batches to improve performance
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0)
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=5.0)
resolved_tqdm_kwargs.update(tqdm_kwargs)

reduced_log_lines = list()
per_buffer_index = 0
reduced_and_binned_logs = collections.defaultdict(list)
buffered_text_reader = BufferedTextReader(
file_path=raw_s3_log_file_path,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)
for buffered_raw_lines in tqdm.tqdm(
progress_bar_iterator = tqdm.tqdm(
iterable=buffered_text_reader,
total=len(buffered_text_reader),
**resolved_tqdm_kwargs,
):
index = 0
for raw_line in buffered_raw_lines:
)

per_buffer_index = 0
for buffered_raw_lines in progress_bar_iterator:
for index, raw_line in enumerate(buffered_raw_lines):
line_index = per_buffer_index + index

_append_reduced_log_line(
raw_line=raw_line,
reduced_log_lines=reduced_log_lines,
reduced_and_binned_logs=reduced_and_binned_logs,
asset_id_handler=asset_id_handler,
bucket=bucket,
request_type=request_type,
excluded_ips=excluded_ips,
log_file_path=raw_s3_log_file_path,
line_index=line_index,
)
index += 1
per_buffer_index += index

return reduced_log_lines
return reduced_and_binned_logs
Loading