From 7d6c78922fd3f23dd7514a01bc2a30260a76fb67 Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Sun, 11 Aug 2024 13:09:09 -0400 Subject: [PATCH] Improve argument names (#23) * various reorganizations, renames, and improvements * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * propagate CLI; generalize status codes --------- Co-authored-by: CodyCBakerPhD Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- src/dandi_s3_log_parser/__init__.py | 3 +- .../_buffered_text_reader.py | 13 +- .../_command_line_interface.py | 16 +- .../_dandi_s3_log_file_parser.py | 334 ++++++++++++++++++ .../_s3_log_file_parser.py | 330 +---------------- .../_s3_log_line_parser.py | 6 +- tests/test_buffered_text_reader.py | 10 +- tests/test_dandi_s3_log_parser.py | 2 +- 8 files changed, 367 insertions(+), 347 deletions(-) create mode 100644 src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index ec3f54f..1b5bb2d 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -1,9 +1,10 @@ """Outermost exposed imports; including global environment variables.""" from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt -from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs +from ._s3_log_file_parser import parse_raw_s3_log from ._buffered_text_reader import BufferedTextReader from ._order_parsed_logs import order_parsed_logs +from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", diff --git a/src/dandi_s3_log_parser/_buffered_text_reader.py b/src/dandi_s3_log_parser/_buffered_text_reader.py index df175cc..6039fe9 100644 --- a/src/dandi_s3_log_parser/_buffered_text_reader.py +++ b/src/dandi_s3_log_parser/_buffered_text_reader.py @@ -2,7 +2,7 @@ class BufferedTextReader: - def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: int = 10**9): + def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_bytes: int = 10**9): """ Lazily read a text file into RAM using buffers of a specified size. @@ -10,15 +10,16 @@ def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: ---------- file_path : string or pathlib.Path The path to the text file to be read. - maximum_ram_usage_in_bytes : int, default: 1 GB - The theoretical maximum amount of RAM (in bytes) to be used by the BufferedTextReader object. + maximum_buffer_size_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text file. """ self.file_path = file_path - self.maximum_ram_usage_in_bytes = maximum_ram_usage_in_bytes + self.maximum_buffer_size_in_bytes = maximum_buffer_size_in_bytes # The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage # due to decoding and handling - self.buffer_size_in_bytes = int(maximum_ram_usage_in_bytes / 3) + self.buffer_size_in_bytes = int(maximum_buffer_size_in_bytes / 3) self.total_file_size = pathlib.Path(file_path).stat().st_size self.offset = 0 @@ -48,7 +49,7 @@ def __next__(self) -> list[str]: if len(buffer) == 0 and last_line != "": raise ValueError( f"BufferedTextReader encountered a line at offset {self.offset} that exceeds the buffer " - "size! Try increasing the `buffer_size_in_bytes` to account for this line." + "size! Try increasing the `maximum_buffer_size_in_bytes` to account for this line." ) # The last line split by the intermediate buffer may or may not be incomplete diff --git a/src/dandi_s3_log_parser/_command_line_interface.py b/src/dandi_s3_log_parser/_command_line_interface.py index d62a8d1..0d90acd 100644 --- a/src/dandi_s3_log_parser/_command_line_interface.py +++ b/src/dandi_s3_log_parser/_command_line_interface.py @@ -49,10 +49,10 @@ default=None, ) @click.option( - "--number_of_jobs", - help="The number of jobs to use for parallel processing.", + "--maximum_number_of_workers", + help="The maximum number of workers to distribute tasks across.", required=False, - type=int, + type=click.IntRange(1, os.cpu_count()), default=1, ) def parse_all_dandi_raw_s3_logs_cli( @@ -60,12 +60,8 @@ def parse_all_dandi_raw_s3_logs_cli( parsed_s3_log_folder_path: str, mode: Literal["w", "a"] = "a", excluded_ips: str | None = None, - number_of_jobs: int = 1, + maximum_number_of_workers: int = 1, ) -> None: - number_of_jobs = NUMBER_OF_CPU + number_of_jobs + 1 if number_of_jobs < 0 else number_of_jobs - assert number_of_jobs > 0, "The number of jobs must be greater than 0." - assert number_of_jobs <= NUMBER_OF_CPU, "The number of jobs must be less than or equal to the number of CPUs." - split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else [] handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None for excluded_ip in split_excluded_ips: @@ -76,12 +72,12 @@ def parse_all_dandi_raw_s3_logs_cli( parsed_s3_log_folder_path=parsed_s3_log_folder_path, mode=mode, excluded_ips=handled_excluded_ips, - number_of_jobs=number_of_jobs, + maximum_number_of_workers=maximum_number_of_workers, ) # TODO -@click.command(name="parse_dandi_raw_s3_logs") +@click.command(name="parse_dandi_raw_s3_log") def parse_dandi_raw_s3_log_cli() -> None: parse_dandi_raw_s3_log() diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py new file mode 100644 index 0000000..c00c5e4 --- /dev/null +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py @@ -0,0 +1,334 @@ +"""Primary functions for parsing raw S3 log file for DANDI.""" + +import collections +import datetime +import pathlib +import os +import shutil +import traceback +import uuid +from concurrent.futures import ProcessPoolExecutor, as_completed +from typing import Callable, Literal +import importlib.metadata + +import pandas +from pydantic import Field, validate_call +import tqdm + +from ._ip_utils import ( + _get_latest_github_ip_ranges, +) +from ._s3_log_file_parser import parse_raw_s3_log +from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH +from ._order_parsed_logs import order_parsed_logs + + +@validate_call +def parse_all_dandi_raw_s3_logs( + *, + base_raw_s3_log_folder_path: str | pathlib.Path, + parsed_s3_log_folder_path: str | pathlib.Path, + excluded_ips: collections.defaultdict[str, bool] | None = None, + exclude_github_ips: bool = True, + maximum_number_of_workers: int = Field(ge=1, le=os.cpu_count(), default=1), + maximum_buffer_size_in_bytes: int = 4 * 10**9, +) -> None: + """ + Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. + + Assumes the following folder structure... + + |- + |-- 2019 (year) + |--- 01 (month) + |---- 01.log (day) + | ... + + Parameters + ---------- + base_raw_s3_log_folder_path : string or pathlib.Path + Path to the folder containing the raw S3 log files. + parsed_s3_log_folder_path : string or pathlib.Path + Path to write each parsed S3 log file to. + There will be one file per handled asset ID. + excluded_ips : collections.defaultdict of strings to booleans, optional + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + exclude_github_ips : bool, default: True + Include all GitHub action IP addresses in the `excluded_ips`. + maximum_number_of_workers : int, default: 1 + The maximum number of workers to distribute tasks across. + maximum_buffer_size_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text files. + Actual total RAM usage will be higher due to overhead and caching. + Automatically splits this total amount over the maximum number of workers if `maximum_number_of_workers` is + greater than one. + """ + base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) + parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) + parsed_s3_log_folder_path.mkdir(exist_ok=True) + + # Create a fresh temporary directory in the home folder and then fresh subfolders for each job + temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" + temporary_base_folder_path.mkdir(exist_ok=True) + + # Clean up any previous tasks that failed to clean themselves up + for previous_task_folder_path in temporary_base_folder_path.iterdir(): + shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) + + task_id = str(uuid.uuid4())[:5] + temporary_folder_path = temporary_base_folder_path / task_id + temporary_folder_path.mkdir(exist_ok=True) + + temporary_output_folder_path = temporary_folder_path / "output" + temporary_output_folder_path.mkdir(exist_ok=True) + + # Re-define some top-level pass-through items here to avoid repeated constructions + excluded_ips = excluded_ips or collections.defaultdict(bool) + if exclude_github_ips: + for github_ip in _get_latest_github_ip_ranges(): + excluded_ips[github_ip] = True + + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + daily_raw_s3_log_file_paths = list(base_raw_s3_log_folder_path.rglob(pattern="*.log")) + + if maximum_number_of_workers == 1: + for raw_s3_log_file_path in tqdm.tqdm( + iterable=daily_raw_s3_log_file_paths, + desc="Parsing log files...", + position=0, + leave=True, + ): + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=temporary_output_folder_path, + mode="a", + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=1, leave=False), + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + order_results=False, # Will immediately reorder all files at the end + ) + else: + per_job_temporary_folder_paths = list() + for job_index in range(maximum_number_of_workers): + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + per_job_temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths.append(per_job_temporary_folder_path) + + maximum_buffer_size_in_bytes_per_job = maximum_buffer_size_in_bytes // maximum_number_of_workers + + futures = [] + with ProcessPoolExecutor(max_workers=maximum_number_of_workers) as executor: + for raw_s3_log_file_path in daily_raw_s3_log_file_paths: + futures.append( + executor.submit( + _multi_job_parse_dandi_raw_s3_log, + maximum_number_of_workers=maximum_number_of_workers, + raw_s3_log_file_path=raw_s3_log_file_path, + temporary_folder_path=temporary_folder_path, + excluded_ips=excluded_ips, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_job, + ) + ) + + progress_bar_iterable = tqdm.tqdm( + iterable=as_completed(futures), + desc=f"Parsing log files using {maximum_number_of_workers} jobs...", + total=len(daily_raw_s3_log_file_paths), + position=0, + leave=True, + ) + for future in progress_bar_iterable: + future.result() # This is the call that finally triggers the deployment to the workers + + print("\n\nParallel parsing complete!\n\n") + + for per_job_temporary_folder_path in tqdm.tqdm( + iterable=per_job_temporary_folder_paths, + desc="Merging results across jobs...", + total=len(per_job_temporary_folder_paths), + position=0, + leave=True, + ): + per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + assert len(per_job_parsed_s3_log_file_paths) != 0, f"No files found in {per_job_temporary_folder_path}!" + + for per_job_parsed_s3_log_file_path in tqdm.tqdm( + iterable=per_job_parsed_s3_log_file_paths, + desc="Merging results per job...", + total=len(per_job_parsed_s3_log_file_paths), + position=1, + leave=False, + mininterval=1.0, + ): + merged_temporary_file_path = temporary_output_folder_path / per_job_parsed_s3_log_file_path.name + + parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path) + + header = False if merged_temporary_file_path.exists() else True + parsed_s3_log.to_csv( + path_or_buf=merged_temporary_file_path, mode="a", sep="\t", header=header, index=False + ) + + # Always apply this step at the end to be sure we maintained chronological order + # (even if you think order of iteration itself was performed chronologically) + # This step also adds the index counter to the TSV + order_parsed_logs( + unordered_parsed_s3_log_folder_path=temporary_output_folder_path, + ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, + ) + + shutil.rmtree(path=temporary_output_folder_path, ignore_errors=True) + + return None + + +def _multi_job_parse_dandi_raw_s3_log( + *, + maximum_number_of_workers: int, + raw_s3_log_file_path: pathlib.Path, + temporary_folder_path: pathlib.Path, + excluded_ips: collections.defaultdict[str, bool] | None, + maximum_buffer_size_in_bytes: int, +) -> None: + """ + A mostly pass-through function to calculate the job index on the worker and target the correct subfolder. + + Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe) + to a log file. + """ + + try: + error_message = "" + + def asset_id_handler(*, raw_asset_id: str) -> str: + """Apparently callables, even simple built-in ones, cannot be pickled.""" + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + job_index = os.getpid() % maximum_number_of_workers + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + + # Define error catching stuff as part of the try clause + # so that if there is a problem within that, it too is caught + errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" + errors_folder_path.mkdir(exist_ok=True) + + dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") + date = datetime.datetime.now().strftime("%y%m%d") + parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt" + error_message += ( + f"Job index {job_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n" + ) + + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=per_job_temporary_folder_path, + mode="a", + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=job_index + 1, leave=False), + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + order_results=False, # Always disable this for parallel processing + ) + except Exception as exception: + with open(file=parallel_errors_file_path, mode="a") as io: + error_message += f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}\n\n" + io.write(error_message) + + return None + + +def parse_dandi_raw_s3_log( + *, + raw_s3_log_file_path: str | pathlib.Path, + parsed_s3_log_folder_path: str | pathlib.Path, + mode: Literal["w", "a"] = "a", + excluded_ips: collections.defaultdict[str, bool] | None = None, + exclude_github_ips: bool = True, + asset_id_handler: Callable | None = None, + tqdm_kwargs: dict | None = None, + maximum_buffer_size_in_bytes: int = 4 * 10**9, + order_results: bool = True, +) -> None: + """ + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + + 'Parsing' here means: + - limiting only to requests of the specified type (i.e., GET, PUT, etc.) + - reducing the information to the asset ID, request time, request size, and geographic IP of the requester + + Parameters + ---------- + raw_s3_log_file_path : string or pathlib.Path + Path to the raw S3 log file. + parsed_s3_log_folder_path : string or pathlib.Path + The path to write each parsed S3 log file to. + There will be one file per handled asset ID. + mode : "w" or "a", default: "a" + How to resolve the case when files already exist in the folder containing parsed logs. + "w" will overwrite existing content, "a" will append or create if the file does not yet exist. + + The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate + over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. + excluded_ips : collections.defaultdict of strings to booleans, optional + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + exclude_github_ips : bool, default: True + Include all GitHub action IP addresses in the `excluded_ips`. + asset_id_handler : callable, optional + If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to + translate into nested directory paths) then define a function of the following form: + + # For example + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_buffer_size_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text file. + order_results : bool, default: True + Whether to order the results chronologically. + This is strongly suggested, but a common case of disabling it is if ordering is intended to be applied after + multiple steps of processing instead of during this operation. + """ + tqdm_kwargs = tqdm_kwargs or dict() + + bucket = "dandiarchive" + request_type = "GET" + + # Form a lookup for IP addresses to exclude; much faster than asking 'if in' a list on each iteration + # Exclude GitHub actions, which are responsible for running health checks on archive which bloat the logs + excluded_ips = excluded_ips or collections.defaultdict(bool) + if exclude_github_ips: + for github_ip in _get_latest_github_ip_ranges(): + excluded_ips[github_ip] = True + + if asset_id_handler is None: + + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + parse_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=parsed_s3_log_folder_path, + mode=mode, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + asset_id_handler=asset_id_handler, + tqdm_kwargs=tqdm_kwargs, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + order_results=order_results, + ) + + return None diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index d78bf47..3c05cbf 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -1,21 +1,15 @@ """Primary functions for parsing raw S3 log file for DANDI.""" import collections -import datetime import pathlib -import os import shutil -import traceback import uuid -from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal -import importlib.metadata import pandas import tqdm from ._ip_utils import ( - _get_latest_github_ip_ranges, _load_ip_address_to_region_cache, _save_ip_address_to_region_cache, ) @@ -25,312 +19,6 @@ from ._order_parsed_logs import order_parsed_logs -def parse_all_dandi_raw_s3_logs( - *, - base_raw_s3_log_folder_path: str | pathlib.Path, - parsed_s3_log_folder_path: str | pathlib.Path, - excluded_ips: collections.defaultdict[str, bool] | None = None, - exclude_github_ips: bool = True, - number_of_jobs: int = 1, - maximum_ram_usage_in_bytes: int = 4 * 10**9, -) -> None: - """ - Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. - - Assumes the following folder structure... - - |- - |-- 2019 (year) - |--- 01 (month) - |---- 01.log (day) - | ... - - Parameters - ---------- - base_raw_s3_log_folder_path : string or pathlib.Path - Path to the folder containing the raw S3 log files. - parsed_s3_log_folder_path : string or pathlib.Path - Path to write each parsed S3 log file to. - There will be one file per handled asset ID. - excluded_ips : collections.defaultdict of strings to booleans, optional - A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - exclude_github_ips : bool, default: True - Include all GitHub action IP addresses in the `excluded_ips`. - number_of_jobs : int, default: 1 - The number of jobs to use for parallel processing. - Allows negative range to mean 'all but this many (minus one) jobs'. - E.g., -1 means use all workers, -2 means all but one worker. - WARNING: planned but not yet supported. - maximum_ram_usage_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to be used across all the processes. - """ - base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) - parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) - parsed_s3_log_folder_path.mkdir(exist_ok=True) - - # Create a fresh temporary directory in the home folder and then fresh subfolders for each job - temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" - temporary_base_folder_path.mkdir(exist_ok=True) - - # Clean up any previous tasks that failed to clean themselves up - for previous_task_folder_path in temporary_base_folder_path.iterdir(): - shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) - - task_id = str(uuid.uuid4())[:5] - temporary_folder_path = temporary_base_folder_path / task_id - temporary_folder_path.mkdir(exist_ok=True) - - temporary_output_folder_path = temporary_folder_path / "output" - temporary_output_folder_path.mkdir(exist_ok=True) - - # Re-define some top-level pass-through items here to avoid repeated constructions - excluded_ips = excluded_ips or collections.defaultdict(bool) - if exclude_github_ips: - for github_ip in _get_latest_github_ip_ranges(): - excluded_ips[github_ip] = True - - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] - - daily_raw_s3_log_file_paths = list(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - - if number_of_jobs == 1: - for raw_s3_log_file_path in tqdm.tqdm( - iterable=daily_raw_s3_log_file_paths, - desc="Parsing log files...", - position=0, - leave=True, - ): - parse_dandi_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=temporary_output_folder_path, - mode="a", - excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction - asset_id_handler=asset_id_handler, - tqdm_kwargs=dict(position=1, leave=False), - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, - order_results=False, # Will immediately reorder all files at the end - ) - else: - per_job_temporary_folder_paths = list() - for job_index in range(number_of_jobs): - per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" - per_job_temporary_folder_path.mkdir(exist_ok=True) - per_job_temporary_folder_paths.append(per_job_temporary_folder_path) - - maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs - - futures = [] - with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: - for raw_s3_log_file_path in daily_raw_s3_log_file_paths: - futures.append( - executor.submit( - _multi_job_parse_dandi_raw_s3_log, - number_of_jobs=number_of_jobs, - raw_s3_log_file_path=raw_s3_log_file_path, - temporary_folder_path=temporary_folder_path, - excluded_ips=excluded_ips, - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job, - ) - ) - - progress_bar_iterable = tqdm.tqdm( - iterable=as_completed(futures), - desc=f"Parsing log files using {number_of_jobs} jobs...", - total=len(daily_raw_s3_log_file_paths), - position=0, - leave=True, - ) - for future in progress_bar_iterable: - future.result() # This is the call that finally triggers the deployment to the workers - - print("\n\nParallel parsing complete!\n\n") - - for per_job_temporary_folder_path in tqdm.tqdm( - iterable=per_job_temporary_folder_paths, - desc="Merging results across jobs...", - total=len(per_job_temporary_folder_paths), - position=0, - leave=True, - ): - per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) - assert len(per_job_parsed_s3_log_file_paths) != 0, f"No files found in {per_job_temporary_folder_path}!" - - for per_job_parsed_s3_log_file_path in tqdm.tqdm( - iterable=per_job_parsed_s3_log_file_paths, - desc="Merging results per job...", - total=len(per_job_parsed_s3_log_file_paths), - position=1, - leave=False, - mininterval=1.0, - ): - merged_temporary_file_path = temporary_output_folder_path / per_job_parsed_s3_log_file_path.name - - parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path) - - header = False if merged_temporary_file_path.exists() else True - parsed_s3_log.to_csv( - path_or_buf=merged_temporary_file_path, mode="a", sep="\t", header=header, index=False - ) - - # Always apply this step at the end to be sure we maintained chronological order - # (even if you think order of iteration itself was performed chronologically) - # This step also adds the index counter to the TSV - order_parsed_logs( - unordered_parsed_s3_log_folder_path=temporary_output_folder_path, - ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, - ) - - shutil.rmtree(path=temporary_output_folder_path, ignore_errors=True) - - return None - - -def _multi_job_parse_dandi_raw_s3_log( - *, - number_of_jobs: int, - raw_s3_log_file_path: pathlib.Path, - temporary_folder_path: pathlib.Path, - excluded_ips: collections.defaultdict[str, bool] | None, - maximum_ram_usage_in_bytes: int, -) -> None: - """ - A mostly pass-through function to calculate the job index on the worker and target the correct subfolder. - - Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe) - to a log file. - """ - - try: - error_message = "" - - def asset_id_handler(*, raw_asset_id: str) -> str: - """Apparently callables, even simple built-in ones, cannot be pickled.""" - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] - - job_index = os.getpid() % number_of_jobs - per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" - - # Define error catching stuff as part of the try clause - # so that if there is a problem within that, it too is caught - errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" - errors_folder_path.mkdir(exist_ok=True) - - dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") - date = datetime.datetime.now().strftime("%y%m%d") - parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt" - error_message += f"Job index {job_index}/{number_of_jobs} parsing {raw_s3_log_file_path} failed due to\n\n" - - parse_dandi_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=per_job_temporary_folder_path, - mode="a", - excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction - asset_id_handler=asset_id_handler, - tqdm_kwargs=dict(position=job_index + 1, leave=False), - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, - order_results=False, # Always disable this for parallel processing - ) - except Exception as exception: - with open(file=parallel_errors_file_path, mode="a") as io: - error_message += f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}\n\n" - io.write(error_message) - - return None - - -def parse_dandi_raw_s3_log( - *, - raw_s3_log_file_path: str | pathlib.Path, - parsed_s3_log_folder_path: str | pathlib.Path, - mode: Literal["w", "a"] = "a", - excluded_ips: collections.defaultdict[str, bool] | None = None, - exclude_github_ips: bool = True, - asset_id_handler: Callable | None = None, - tqdm_kwargs: dict | None = None, - maximum_ram_usage_in_bytes: int = 4 * 10**9, - order_results: bool = True, -) -> None: - """ - Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. - - 'Parsing' here means: - - limiting only to requests of the specified type (i.e., GET, PUT, etc.) - - reducing the information to the asset ID, request time, request size, and geographic IP of the requester - - Parameters - ---------- - raw_s3_log_file_path : string or pathlib.Path - Path to the raw S3 log file. - parsed_s3_log_folder_path : string or pathlib.Path - The path to write each parsed S3 log file to. - There will be one file per handled asset ID. - mode : "w" or "a", default: "a" - How to resolve the case when files already exist in the folder containing parsed logs. - "w" will overwrite existing content, "a" will append or create if the file does not yet exist. - - The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate - over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. - HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. - excluded_ips : collections.defaultdict of strings to booleans, optional - A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - exclude_github_ips : bool, default: True - Include all GitHub action IP addresses in the `excluded_ips`. - asset_id_handler : callable, optional - If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to - translate into nested directory paths) then define a function of the following form: - - # For example - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] - tqdm_kwargs : dict, optional - Keyword arguments to pass to the tqdm progress bar. - maximum_ram_usage_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to be used throughout the process. - order_results : bool, default: True - Whether to order the results chronologically. - This is strongly suggested, but a common case of disabling it is if ordering is intended to be applied after - multiple steps of processing instead of during this operation. - """ - tqdm_kwargs = tqdm_kwargs or dict() - - bucket = "dandiarchive" - request_type = "GET" - - # Form a lookup for IP addresses to exclude; much faster than asking 'if in' a list on each iteration - # Exclude GitHub actions, which are responsible for running health checks on archive which bloat the logs - excluded_ips = excluded_ips or collections.defaultdict(bool) - if exclude_github_ips: - for github_ip in _get_latest_github_ip_ranges(): - excluded_ips[github_ip] = True - - if asset_id_handler is None: - - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] - - parse_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=parsed_s3_log_folder_path, - mode=mode, - bucket=bucket, - request_type=request_type, - excluded_ips=excluded_ips, - asset_id_handler=asset_id_handler, - tqdm_kwargs=tqdm_kwargs, - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, - order_results=order_results, - ) - - return None - - def parse_raw_s3_log( *, raw_s3_log_file_path: str | pathlib.Path, @@ -341,7 +29,7 @@ def parse_raw_s3_log( excluded_ips: collections.defaultdict[str, bool] | None = None, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, - maximum_ram_usage_in_bytes: int = 4 * 10**9, + maximum_buffer_size_in_bytes: int = 4 * 10**9, order_results: bool = True, ) -> None: """ @@ -381,8 +69,9 @@ def asset_id_handler(*, raw_asset_id: str) -> str: return split_by_slash[0] + "_" + split_by_slash[-1] tqdm_kwargs : dict, optional Keyword arguments to pass to the tqdm progress bar. - maximum_ram_usage_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + maximum_buffer_size_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text file. order_results : bool, default: True Whether to order the results chronologically. This is strongly suggested, but a common case of disabling it is if ordering is intended to be applied after @@ -415,7 +104,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: request_type=request_type, excluded_ips=excluded_ips, tqdm_kwargs=tqdm_kwargs, - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) reduced_logs_binned_by_unparsed_asset = dict() @@ -465,7 +154,7 @@ def _get_reduced_log_lines( request_type: Literal["GET", "PUT"], excluded_ips: collections.defaultdict[str, bool], tqdm_kwargs: dict | None = None, - maximum_ram_usage_in_bytes: int = 4 * 10**9, + maximum_buffer_size_in_bytes: int = 4 * 10**9, ) -> list[ReducedLogLine]: """ Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. @@ -482,8 +171,9 @@ def _get_reduced_log_lines( A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. tqdm_kwargs : dict, optional Keyword arguments to pass to the tqdm progress bar. - maximum_ram_usage_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + maximum_buffer_size_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text file. """ assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" @@ -502,7 +192,7 @@ def _get_reduced_log_lines( reduced_log_lines = list() per_buffer_index = 0 buffered_text_reader = BufferedTextReader( - file_path=raw_s3_log_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes + file_path=raw_s3_log_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes ) for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): index = 0 diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index b752d13..08b4edb 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -161,10 +161,8 @@ def _append_reduced_log_line( if full_log_line.bucket != bucket: return None - # Skip all non-perfect status - # 200: Success - # 206: Partial Content - if full_log_line.status_code not in ["200", "206"]: + # Skip all non-success status codes (those in the 200 block) + if full_log_line.status_code[0] != "2": return None # Derived from command string, e.g., "HEAD /blobs/b38/..." diff --git a/tests/test_buffered_text_reader.py b/tests/test_buffered_text_reader.py index 6fae0f8..7ef6b13 100644 --- a/tests/test_buffered_text_reader.py +++ b/tests/test_buffered_text_reader.py @@ -35,9 +35,9 @@ def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory): def test_buffered_text_reader(large_text_file_path: pathlib.Path): - maximum_ram_usage_in_bytes = 10**6 # 1 MB + maximum_buffer_size_in_bytes = 10**6 # 1 MB buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=large_text_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes + file_path=large_text_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes ) assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!" @@ -55,15 +55,15 @@ def test_buffered_text_reader(large_text_file_path: pathlib.Path): def test_value_error(single_line_text_file_path: pathlib.Path): - maximum_ram_usage_in_bytes = 10**6 # 1 MB + maximum_buffer_size_in_bytes = 10**6 # 1 MB with pytest.raises(ValueError) as error_info: buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=single_line_text_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes + file_path=single_line_text_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes ) next(buffered_text_reader) expected_message = ( "BufferedTextReader encountered a line at offset 0 that exceeds the buffer size! " - "Try increasing the `buffer_size_in_bytes` to account for this line." + "Try increasing the `maximum_buffer_size_in_bytes` to account for this line." ) assert str(error_info.value) == expected_message diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 646eee9..60a7ce3 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -97,7 +97,7 @@ def test_parse_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local): dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( base_raw_s3_log_folder_path=examples_folder_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, - number_of_jobs=2, + maximum_number_of_workers=2, ) test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir())