Skip to content

Commit

Permalink
fixes (#35)
Browse files Browse the repository at this point in the history
Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 14, 2024
1 parent ee06156 commit b86be2f
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 163 deletions.
46 changes: 11 additions & 35 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from pydantic import DirectoryPath, Field, FilePath, validate_call

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._ip_utils import (
_get_latest_github_ip_ranges,
)
from ._s3_log_file_parser import parse_raw_s3_log


Expand All @@ -31,7 +28,6 @@ def parse_all_dandi_raw_s3_logs(
parsed_s3_log_folder_path: DirectoryPath,
excluded_log_files: list[FilePath] | None = None,
excluded_ips: collections.defaultdict[str, bool] | None = None,
exclude_github_ips: bool = True,
maximum_number_of_workers: int = Field(ge=1, default=1),
maximum_buffer_size_in_bytes: int = 4 * 10**9,
) -> None:
Expand All @@ -57,8 +53,6 @@ def parse_all_dandi_raw_s3_logs(
A list of log file paths to exclude from parsing.
excluded_ips : collections.defaultdict of strings to booleans, optional
A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing.
exclude_github_ips : bool, default: True
Include all GitHub action IP addresses in the `excluded_ips`.
maximum_number_of_workers : int, default: 1
The maximum number of workers to distribute tasks across.
maximum_buffer_size_in_bytes : int, default: 4 GB
Expand All @@ -74,13 +68,7 @@ def parse_all_dandi_raw_s3_logs(
excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files}
excluded_ips = excluded_ips or collections.defaultdict(bool)

if exclude_github_ips:
for github_ip in _get_latest_github_ip_ranges():
excluded_ips[github_ip] = True

def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
asset_id_handler = _get_default_dandi_asset_id_handler()

# The .rglob is not naturally sorted; shuffle for more uniform progress updates
daily_raw_s3_log_file_paths = set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files
Expand All @@ -98,7 +86,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
mode="a",
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=1, leave=False),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
Expand Down Expand Up @@ -203,10 +190,7 @@ def _multi_job_parse_dandi_raw_s3_log(
try:
error_message = ""

def asset_id_handler(*, raw_asset_id: str) -> str:
"""Apparently callables, even simple built-in ones, cannot be pickled."""
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
asset_id_handler = _get_default_dandi_asset_id_handler()

job_index = os.getpid() % maximum_number_of_workers
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
Expand All @@ -228,7 +212,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
parsed_s3_log_folder_path=per_job_temporary_folder_path,
mode="a",
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=job_index + 1, leave=False),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
Expand All @@ -245,7 +228,6 @@ def parse_dandi_raw_s3_log(
parsed_s3_log_folder_path: str | pathlib.Path,
mode: Literal["w", "a"] = "a",
excluded_ips: collections.defaultdict[str, bool] | None = None,
exclude_github_ips: bool = True,
asset_id_handler: Callable | None = None,
tqdm_kwargs: dict | None = None,
maximum_buffer_size_in_bytes: int = 4 * 10**9,
Expand All @@ -272,8 +254,6 @@ def parse_dandi_raw_s3_log(
over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration.
excluded_ips : collections.defaultdict of strings to booleans, optional
A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing.
exclude_github_ips : bool, default: True
Include all GitHub action IP addresses in the `excluded_ips`.
asset_id_handler : callable, optional
If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to
translate into nested directory paths) then define a function of the following form:
Expand All @@ -290,24 +270,12 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
"""
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
asset_id_handler = asset_id_handler or _get_default_dandi_asset_id_handler()
tqdm_kwargs = tqdm_kwargs or dict()

bucket = "dandiarchive"
request_type = "GET"

# Form a lookup for IP addresses to exclude; much faster than asking 'if in' a list on each iteration
# Exclude GitHub actions, which are responsible for running health checks on archive which bloat the logs
excluded_ips = excluded_ips or collections.defaultdict(bool)
if exclude_github_ips:
for github_ip in _get_latest_github_ip_ranges():
excluded_ips[github_ip] = True

if asset_id_handler is None:

def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

parse_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
Expand All @@ -319,3 +287,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
tqdm_kwargs=tqdm_kwargs,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)


def _get_default_dandi_asset_id_handler() -> Callable:
def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

return asset_id_handler
88 changes: 41 additions & 47 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import tqdm

from ._buffered_text_reader import BufferedTextReader
from ._s3_log_line_parser import _append_reduced_log_line, _ReducedLogLine
from ._s3_log_line_parser import _append_reduced_log_line


def parse_raw_s3_log(
Expand Down Expand Up @@ -67,65 +67,57 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
parsed_s3_log_folder_path.mkdir(exist_ok=True)
bucket = bucket or ""
excluded_ips = excluded_ips or collections.defaultdict(bool)
asset_id_handler = asset_id_handler or (lambda asset_id: asset_id)
tqdm_kwargs = tqdm_kwargs or dict()

reduced_logs = _get_reduced_log_lines(
assert raw_s3_log_file_path.suffix == ".log", f"`{raw_s3_log_file_path=}` should end in '.log'!"

reduced_and_binned_logs = _get_reduced_and_binned_log_lines(
raw_s3_log_file_path=raw_s3_log_file_path,
asset_id_handler=asset_id_handler,
bucket=bucket,
request_type=request_type,
excluded_ips=excluded_ips,
tqdm_kwargs=tqdm_kwargs,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)

reduced_logs_binned_by_unparsed_asset = dict()
for reduced_log in reduced_logs:
raw_asset_id = reduced_log.asset_id
reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get(
raw_asset_id,
collections.defaultdict(list),
)

reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp)
reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent)
reduced_logs_binned_by_unparsed_asset[raw_asset_id]["ip_address"].append(reduced_log.ip_address)
reduced_logs_binned_by_unparsed_asset[raw_asset_id]["line_index"].append(reduced_log.line_index)

if asset_id_handler is not None:
reduced_logs_binned_by_asset = dict()
for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items():
parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id)
for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items():
parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{handled_asset_id}.tsv"

reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset
else:
reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset

for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items():
parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv"

data_frame = pandas.DataFrame(data=reduced_logs_per_asset)
data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id)

header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True
data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False)


def _get_reduced_log_lines(
def _get_reduced_and_binned_log_lines(
*,
raw_s3_log_file_path: pathlib.Path,
bucket: str | None,
asset_id_handler: Callable,
bucket: str,
request_type: Literal["GET", "PUT"],
excluded_ips: collections.defaultdict[str, bool],
tqdm_kwargs: dict | None = None,
maximum_buffer_size_in_bytes: int = 4 * 10**9,
) -> list[_ReducedLogLine]:
tqdm_kwargs: dict,
maximum_buffer_size_in_bytes: int,
) -> collections.defaultdict[str, dict[str, list[str | int]]]:
"""
Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects.
Reduce the full S3 log file to minimal content and bin by asset ID.
Parameters
----------
raw_s3_log_file_path : str or pathlib.Path
Path to the raw S3 log file.
asset_id_handler : callable, optional
If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to
translate into nested directory paths) then define a function of the following form:
# For example
def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
bucket : str
Only parse and return lines that match this bucket.
request_type : str
Expand All @@ -137,42 +129,44 @@ def _get_reduced_log_lines(
maximum_buffer_size_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text file.
"""
assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!"
# Collapse bucket to empty string instead of asking if it is None on each iteration
bucket = "" if bucket is None else bucket
Returns
-------
reduced_and_binned_logs : collections.defaultdict
A map of all reduced log line content binned by handled asset ID.
"""
tqdm_kwargs = tqdm_kwargs or dict()

# Perform I/O read in batches to improve performance
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0)
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=5.0)
resolved_tqdm_kwargs.update(tqdm_kwargs)

reduced_log_lines = list()
per_buffer_index = 0
reduced_and_binned_logs = collections.defaultdict(list)
buffered_text_reader = BufferedTextReader(
file_path=raw_s3_log_file_path,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)
for buffered_raw_lines in tqdm.tqdm(
progress_bar_iterator = tqdm.tqdm(
iterable=buffered_text_reader,
total=len(buffered_text_reader),
**resolved_tqdm_kwargs,
):
index = 0
for raw_line in buffered_raw_lines:
)

per_buffer_index = 0
for buffered_raw_lines in progress_bar_iterator:
for index, raw_line in enumerate(buffered_raw_lines):
line_index = per_buffer_index + index

_append_reduced_log_line(
raw_line=raw_line,
reduced_log_lines=reduced_log_lines,
reduced_and_binned_logs=reduced_and_binned_logs,
asset_id_handler=asset_id_handler,
bucket=bucket,
request_type=request_type,
excluded_ips=excluded_ips,
log_file_path=raw_s3_log_file_path,
line_index=line_index,
)
index += 1
per_buffer_index += index

return reduced_log_lines
return reduced_and_binned_logs
Loading

0 comments on commit b86be2f

Please sign in to comment.