From 554dfd54e9d2817c9fa211675c39f295ffb92975 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD <codycbakerphd@gmail.com> Date: Wed, 14 Aug 2024 00:47:40 -0400 Subject: [PATCH] fixes --- .../_dandi_s3_log_file_parser.py | 46 +--- .../_s3_log_file_parser.py | 88 ++++---- .../_s3_log_line_parser.py | 197 +++++++++++------- 3 files changed, 168 insertions(+), 163 deletions(-) diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py index 54a12c0..6ce2d1b 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py @@ -18,9 +18,6 @@ from pydantic import DirectoryPath, Field, FilePath, validate_call from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH -from ._ip_utils import ( - _get_latest_github_ip_ranges, -) from ._s3_log_file_parser import parse_raw_s3_log @@ -31,7 +28,6 @@ def parse_all_dandi_raw_s3_logs( parsed_s3_log_folder_path: DirectoryPath, excluded_log_files: list[FilePath] | None = None, excluded_ips: collections.defaultdict[str, bool] | None = None, - exclude_github_ips: bool = True, maximum_number_of_workers: int = Field(ge=1, le=os.cpu_count() * 5, default=1), maximum_buffer_size_in_bytes: int = 4 * 10**9, ) -> None: @@ -57,8 +53,6 @@ def parse_all_dandi_raw_s3_logs( A list of log file paths to exclude from parsing. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - exclude_github_ips : bool, default: True - Include all GitHub action IP addresses in the `excluded_ips`. maximum_number_of_workers : int, default: 1 The maximum number of workers to distribute tasks across. maximum_buffer_size_in_bytes : int, default: 4 GB @@ -74,13 +68,7 @@ def parse_all_dandi_raw_s3_logs( excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files} excluded_ips = excluded_ips or collections.defaultdict(bool) - if exclude_github_ips: - for github_ip in _get_latest_github_ip_ranges(): - excluded_ips[github_ip] = True - - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] + asset_id_handler = _get_default_dandi_asset_id_handler() # The .rglob is not naturally sorted; shuffle for more uniform progress updates daily_raw_s3_log_file_paths = set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files @@ -98,7 +86,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str: parsed_s3_log_folder_path=parsed_s3_log_folder_path, mode="a", excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction asset_id_handler=asset_id_handler, tqdm_kwargs=dict(position=1, leave=False), maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, @@ -203,10 +190,7 @@ def _multi_job_parse_dandi_raw_s3_log( try: error_message = "" - def asset_id_handler(*, raw_asset_id: str) -> str: - """Apparently callables, even simple built-in ones, cannot be pickled.""" - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] + asset_id_handler = _get_default_dandi_asset_id_handler() job_index = os.getpid() % maximum_number_of_workers per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" @@ -228,7 +212,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str: parsed_s3_log_folder_path=per_job_temporary_folder_path, mode="a", excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction asset_id_handler=asset_id_handler, tqdm_kwargs=dict(position=job_index + 1, leave=False), maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, @@ -245,7 +228,6 @@ def parse_dandi_raw_s3_log( parsed_s3_log_folder_path: str | pathlib.Path, mode: Literal["w", "a"] = "a", excluded_ips: collections.defaultdict[str, bool] | None = None, - exclude_github_ips: bool = True, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, maximum_buffer_size_in_bytes: int = 4 * 10**9, @@ -272,8 +254,6 @@ def parse_dandi_raw_s3_log( over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - exclude_github_ips : bool, default: True - Include all GitHub action IP addresses in the `excluded_ips`. asset_id_handler : callable, optional If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to translate into nested directory paths) then define a function of the following form: @@ -290,24 +270,12 @@ def asset_id_handler(*, raw_asset_id: str) -> str: """ raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) + asset_id_handler = asset_id_handler or _get_default_dandi_asset_id_handler() tqdm_kwargs = tqdm_kwargs or dict() bucket = "dandiarchive" request_type = "GET" - # Form a lookup for IP addresses to exclude; much faster than asking 'if in' a list on each iteration - # Exclude GitHub actions, which are responsible for running health checks on archive which bloat the logs - excluded_ips = excluded_ips or collections.defaultdict(bool) - if exclude_github_ips: - for github_ip in _get_latest_github_ip_ranges(): - excluded_ips[github_ip] = True - - if asset_id_handler is None: - - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] - parse_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, parsed_s3_log_folder_path=parsed_s3_log_folder_path, @@ -319,3 +287,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: tqdm_kwargs=tqdm_kwargs, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) + + +def _get_default_dandi_asset_id_handler() -> Callable: + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + return asset_id_handler diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index c23e4b0..a108a57 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -9,7 +9,7 @@ import tqdm from ._buffered_text_reader import BufferedTextReader -from ._s3_log_line_parser import _append_reduced_log_line, _ReducedLogLine +from ._s3_log_line_parser import _append_reduced_log_line def parse_raw_s3_log( @@ -67,11 +67,16 @@ def asset_id_handler(*, raw_asset_id: str) -> str: raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) parsed_s3_log_folder_path.mkdir(exist_ok=True) + bucket = bucket or "" excluded_ips = excluded_ips or collections.defaultdict(bool) + asset_id_handler = asset_id_handler or (lambda asset_id: asset_id) tqdm_kwargs = tqdm_kwargs or dict() - reduced_logs = _get_reduced_log_lines( + assert raw_s3_log_file_path.suffix == ".log", f"`{raw_s3_log_file_path=}` should end in '.log'!" + + reduced_and_binned_logs = _get_reduced_and_binned_log_lines( raw_s3_log_file_path=raw_s3_log_file_path, + asset_id_handler=asset_id_handler, bucket=bucket, request_type=request_type, excluded_ips=excluded_ips, @@ -79,53 +84,40 @@ def asset_id_handler(*, raw_asset_id: str) -> str: maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) - reduced_logs_binned_by_unparsed_asset = dict() - for reduced_log in reduced_logs: - raw_asset_id = reduced_log.asset_id - reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( - raw_asset_id, - collections.defaultdict(list), - ) - - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["ip_address"].append(reduced_log.ip_address) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["line_index"].append(reduced_log.line_index) - - if asset_id_handler is not None: - reduced_logs_binned_by_asset = dict() - for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items(): - parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id) + for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items(): + parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{handled_asset_id}.tsv" - reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset - else: - reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset - - for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): - parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" - - data_frame = pandas.DataFrame(data=reduced_logs_per_asset) + data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id) header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False) -def _get_reduced_log_lines( +def _get_reduced_and_binned_log_lines( *, raw_s3_log_file_path: pathlib.Path, - bucket: str | None, + asset_id_handler: Callable, + bucket: str, request_type: Literal["GET", "PUT"], excluded_ips: collections.defaultdict[str, bool], - tqdm_kwargs: dict | None = None, - maximum_buffer_size_in_bytes: int = 4 * 10**9, -) -> list[_ReducedLogLine]: + tqdm_kwargs: dict, + maximum_buffer_size_in_bytes: int, +) -> collections.defaultdict[str, dict[str, list[str | int]]]: """ - Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. + Reduce the full S3 log file to minimal content and bin by asset ID. Parameters ---------- raw_s3_log_file_path : str or pathlib.Path Path to the raw S3 log file. + asset_id_handler : callable, optional + If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to + translate into nested directory paths) then define a function of the following form: + + # For example + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] bucket : str Only parse and return lines that match this bucket. request_type : str @@ -137,42 +129,44 @@ def _get_reduced_log_lines( maximum_buffer_size_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the source text file. - """ - assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" - # Collapse bucket to empty string instead of asking if it is None on each iteration - bucket = "" if bucket is None else bucket + Returns + ------- + reduced_and_binned_logs : collections.defaultdict + A map of all reduced log line content binned by handled asset ID. + """ tqdm_kwargs = tqdm_kwargs or dict() # Perform I/O read in batches to improve performance - resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0) + resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=5.0) resolved_tqdm_kwargs.update(tqdm_kwargs) - reduced_log_lines = list() - per_buffer_index = 0 + reduced_and_binned_logs = collections.defaultdict(list) buffered_text_reader = BufferedTextReader( file_path=raw_s3_log_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) - for buffered_raw_lines in tqdm.tqdm( + progress_bar_iterator = tqdm.tqdm( iterable=buffered_text_reader, total=len(buffered_text_reader), **resolved_tqdm_kwargs, - ): - index = 0 - for raw_line in buffered_raw_lines: + ) + + per_buffer_index = 0 + for buffered_raw_lines in progress_bar_iterator: + for index, raw_line in enumerate(buffered_raw_lines): line_index = per_buffer_index + index _append_reduced_log_line( raw_line=raw_line, - reduced_log_lines=reduced_log_lines, + reduced_and_binned_logs=reduced_and_binned_logs, + asset_id_handler=asset_id_handler, bucket=bucket, request_type=request_type, excluded_ips=excluded_ips, log_file_path=raw_s3_log_file_path, line_index=line_index, ) - index += 1 per_buffer_index += index - return reduced_log_lines + return reduced_and_binned_logs diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index af9ef1b..a162c54 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -19,9 +19,14 @@ import importlib.metadata import pathlib import re +from collections.abc import Callable from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH +_KNOWN_REQUEST_TYPES = collections.defaultdict(bool) +for request_type in ["GET", "PUT", "HEAD"]: + _KNOWN_REQUEST_TYPES[request_type] = True + _FULL_PATTERN_TO_FIELD_MAPPING = [ "bucket_owner", "bucket", @@ -50,24 +55,130 @@ "tls_version", "access_point_arn", ] -_REDUCED_PATTERN_TO_FIELD_MAPPING = ["asset_id", "timestamp", "bytes_sent", "ip_address", "line_index"] - _FullLogLine = collections.namedtuple("FullLogLine", _FULL_PATTERN_TO_FIELD_MAPPING) -_ReducedLogLine = collections.namedtuple("ReducedLogLine", _REDUCED_PATTERN_TO_FIELD_MAPPING) _S3_LOG_REGEX = re.compile(pattern=r'"([^"]+)"|\[([^]]+)]|([^ ]+)') +def _append_reduced_log_line( + *, + raw_line: str, + reduced_and_binned_logs: collections.defaultdict[str, dict[str, list[str | int]]], + asset_id_handler: Callable, + bucket: str, + request_type: str, + excluded_ips: collections.defaultdict[str, bool], + line_index: int, + log_file_path: pathlib.Path, +) -> None: + """ + Append the `reduced_and_binned_logs` map with informatione extracted from a single raw log line, if it is valid. + + Parameters + ---------- + raw_line : string + A single line from the raw S3 log file. + reduced_and_binned_logs : collections.defaultdict + A map of reduced log line content binned by handled asset ID. + asset_id_handler : callable, optional + If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to + translate into nested directory paths) then define a function of the following form: + + # For example + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + bucket : string + Only parse and return lines that match this bucket string. + request_type : string + The type of request to filter for. + excluded_ips : collections.defaultdict of strings to booleans + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + line_index: int + The index of the line in the raw log file. + log_file_path: pathlib.Path + The path to the log file being parsed; attached for logging purposes. + """ + parsed_log_line = _parse_s3_log_line(raw_line=raw_line) + + full_log_line = _get_full_log_line( + parsed_log_line=parsed_log_line, + log_file_path=log_file_path, + line_index=line_index, + raw_line=raw_line, + ) + + if full_log_line is None: + return + + # Various early skip conditions + if full_log_line.bucket != bucket: + return + + # Raise some quick parsing errors if anything indicates an improper parsing + # These might slow parsing down a bit, but could be important to ensuring accuracy + if not full_log_line.status_code.isdigit(): + message = f"Unexpected status code: '{full_log_line.status_code}' on line {line_index} of file {log_file_path}." + raise ValueError(message) + + # An expected operation string is "REST.GET.OBJECT" + operation_slice = slice(5, 8) if full_log_line.operation[8] == "." else slice(5, 9) + handled_request_type = full_log_line.operation[operation_slice] + if _KNOWN_REQUEST_TYPES[handled_request_type] is False: + message = ( + f"Unexpected request type: '{handled_request_type}' handled from '{full_log_line.operation}' " + f"on line {line_index} of file {log_file_path}." + ) + raise ValueError(message) + + timezone = full_log_line.timestamp[-5:] != "+0000" + if timezone: + message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`." + raise ValueError(message) + + # More early skip conditions + # Only accept 200-block status codes + if full_log_line.status_code[0] != "2": + return + + if handled_request_type != request_type: + return + + if excluded_ips[full_log_line.ip_address] is True: + return + + # All early skip conditions done; the line is parsed so bin the reduced information by handled asset ID + handled_asset_id = asset_id_handler(raw_asset_id=full_log_line.asset_id) + handled_timestamp = datetime.datetime.strptime(full_log_line.timestamp[:-6], "%d/%b/%Y:%H:%M:%S") + handled_bytes_sent = int(full_log_line.bytes_sent) if full_log_line.bytes_sent != "-" else 0 + + reduced_and_binned_logs[handled_asset_id] = reduced_and_binned_logs.get( + handled_asset_id, + collections.defaultdict(list), + ) + reduced_and_binned_logs[handled_asset_id]["timestamp"].append(handled_timestamp) + reduced_and_binned_logs[handled_asset_id]["bytes_sent"].append(handled_bytes_sent) + reduced_and_binned_logs[handled_asset_id]["ip_address"].append(full_log_line.ip_address) + reduced_and_binned_logs[handled_asset_id]["line_index"].append(line_index) + + def _find_all_possible_substring_indices(*, string: str, substring: str) -> list[int]: indices = list() start = 0 - while True: + max_iter = 1000 + while True and start < max_iter: next_index = string.find(substring, start) if next_index == -1: # .find(...) was unable to locate the substring break indices.append(next_index) start = next_index + 1 + if start >= max_iter: + message = ( + f"Exceeded maximum iterations in `_find_all_possible_substring_indices` on `{string=}` with `{substring=}`." + ) + raise StopIteration(message) + return indices @@ -150,84 +261,8 @@ def _get_full_log_line( date = datetime.datetime.now().strftime("%y%m%d") lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_lines_errors.txt" + # TODO: automatically attempt to anonymize any detectable IP address in the raw line by replacing with 192.0.2.0 with open(file=lines_errors_file_path, mode="a") as io: io.write(f"Line {line_index} of {log_file_path} (parsed {number_of_parsed_items} items): {raw_line}\n\n") return full_log_line - - -def _append_reduced_log_line( - *, - raw_line: str, - reduced_log_lines: list[_ReducedLogLine], - bucket: str, - request_type: str, - excluded_ips: collections.defaultdict[str, bool], - line_index: int, - log_file_path: pathlib.Path, -) -> None: - """ - Append the `reduced_log_lines` list with a ReducedLogLine constructed from a single raw log line, if it is valid. - - Parameters - ---------- - raw_line : string - A single line from the raw S3 log file. - reduced_log_lines : list of ReducedLogLine - The list of ReducedLogLine objects to mutate in place. - This is done to reduce overhead of copying/returning items in-memory via a return-based approach. - bucket : string - Only parse and return lines that match this bucket string. - request_type : string - The type of request to filter for. - excluded_ips : collections.defaultdict of strings to booleans - A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - line_index: int - The index of the line in the raw log file. - """ - bucket = "" if bucket is None else bucket - excluded_ips = excluded_ips or collections.defaultdict(bool) - - parsed_log_line = _parse_s3_log_line(raw_line=raw_line) - - full_log_line = _get_full_log_line( - parsed_log_line=parsed_log_line, - log_file_path=log_file_path, - line_index=line_index, - raw_line=raw_line, - ) - - if full_log_line is None: - return - - # Various early skip conditions - if full_log_line.bucket != bucket: - return - - # Skip all non-success status codes (those in the 200 block) - if full_log_line.status_code[0] != "2": - return - - # Derived from operation string, e.g., "REST.GET.OBJECT" - parsed_request_type = full_log_line.operation.split(".")[1] - if parsed_request_type != request_type: - return - - if excluded_ips[full_log_line.ip_address] is True: - return - - assert ( - full_log_line.timestamp[-5:] == "+0000" - ), f"Unexpected time shift attached to log! Have always seen '+0000', found '{full_log_line.timestamp[-5:]}'." - - parsed_timestamp = datetime.datetime.strptime(full_log_line.timestamp[:-6], "%d/%b/%Y:%H:%M:%S") - parsed_bytes_sent = int(full_log_line.bytes_sent) if full_log_line.bytes_sent != "-" else 0 - reduced_log_line = _ReducedLogLine( - asset_id=full_log_line.asset_id, - timestamp=parsed_timestamp, - bytes_sent=parsed_bytes_sent, - ip_address=full_log_line.ip_address, - line_index=line_index, - ) - - reduced_log_lines.append(reduced_log_line)