diff --git a/pyproject.toml b/pyproject.toml index bb66178..608acf4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "PyYAML", "click", "natsort", + "dandi", ] classifiers = [ "Programming Language :: Python", diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index 279cb4c..ec3f54f 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -3,6 +3,7 @@ from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt from ._s3_log_file_parser import parse_dandi_raw_s3_log, parse_raw_s3_log, parse_all_dandi_raw_s3_logs from ._buffered_text_reader import BufferedTextReader +from ._order_parsed_logs import order_parsed_logs __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", @@ -12,4 +13,5 @@ "parse_raw_s3_log", "parse_dandi_raw_s3_log", "parse_all_dandi_raw_s3_logs", + "order_parsed_logs", ] diff --git a/src/dandi_s3_log_parser/_buffered_text_reader.py b/src/dandi_s3_log_parser/_buffered_text_reader.py index 6ebe0bf..df175cc 100644 --- a/src/dandi_s3_log_parser/_buffered_text_reader.py +++ b/src/dandi_s3_log_parser/_buffered_text_reader.py @@ -2,7 +2,7 @@ class BufferedTextReader: - def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10**9): + def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage_in_bytes: int = 10**9): """ Lazily read a text file into RAM using buffers of a specified size. @@ -10,15 +10,15 @@ def __init__(self, *, file_path: str | pathlib.Path, maximum_ram_usage: int = 10 ---------- file_path : string or pathlib.Path The path to the text file to be read. - maximum_ram_usage : int, default: 1 GB - The theoretical maximum amount of RAM to be used by the BufferedTextReader object. + maximum_ram_usage_in_bytes : int, default: 1 GB + The theoretical maximum amount of RAM (in bytes) to be used by the BufferedTextReader object. """ self.file_path = file_path - self.maximum_ram_usage = maximum_ram_usage + self.maximum_ram_usage_in_bytes = maximum_ram_usage_in_bytes # The actual amount of bytes to read per iteration is 3x less than theoretical maximum usage # due to decoding and handling - self.buffer_size_in_bytes = int(maximum_ram_usage / 3) + self.buffer_size_in_bytes = int(maximum_ram_usage_in_bytes / 3) self.total_file_size = pathlib.Path(file_path).stat().st_size self.offset = 0 diff --git a/src/dandi_s3_log_parser/_order_parsed_logs.py b/src/dandi_s3_log_parser/_order_parsed_logs.py new file mode 100644 index 0000000..75fe932 --- /dev/null +++ b/src/dandi_s3_log_parser/_order_parsed_logs.py @@ -0,0 +1,19 @@ +import pathlib +import pandas + + +def order_parsed_logs( + unordered_parsed_s3_log_folder_path: pathlib.Path, ordered_parsed_s3_log_folder_path: pathlib.Path +) -> None: + """Order the contents of all parsed log files chronologically.""" + ordered_parsed_s3_log_folder_path.mkdir(exist_ok=True) + + for unordered_parsed_s3_log_file_path in unordered_parsed_s3_log_folder_path.iterdir(): + unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, index_col=0) + ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp") + + # correct index of first column + ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log)) + + ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name + ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t") diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 9f75438..c31e8cf 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -3,6 +3,10 @@ import collections import datetime import pathlib +import os +import shutil +import uuid +from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal import pandas @@ -17,189 +21,202 @@ from ._s3_log_line_parser import ReducedLogLine, _append_reduced_log_line from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH from ._buffered_text_reader import BufferedTextReader +from ._order_parsed_logs import order_parsed_logs -def _get_reduced_log_lines( - *, - raw_s3_log_file_path: pathlib.Path, - bucket: str | None, - request_type: Literal["GET", "PUT"], - excluded_ips: collections.defaultdict[str, bool], - tqdm_kwargs: dict | None = None, -) -> list[ReducedLogLine]: - """ - Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. - - Parameters - ---------- - raw_s3_log_file_path : str or pathlib.Path - Path to the raw S3 log file. - bucket : str - Only parse and return lines that match this bucket. - request_type : str - The type of request to filter for. - excluded_ips : collections.defaultdict of strings to booleans - A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - """ - assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" - - # Collapse bucket to empty string instead of asking if it is None on each iteration - bucket = "" if bucket is None else bucket - tqdm_kwargs = tqdm_kwargs or dict() - - # One-time initialization/read of IP address to region cache for performance - # This dictionary is intended to be mutated throughout the process - ip_address_to_region = _load_ip_address_to_region_cache() - - # Perform I/O read in batches to improve performance - resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0) - resolved_tqdm_kwargs.update(tqdm_kwargs) - - reduced_log_lines = list() - per_buffer_index = 0 - buffered_text_reader = BufferedTextReader(file_path=raw_s3_log_file_path) - for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): - for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index): - _append_reduced_log_line( - raw_line=raw_line, - reduced_log_lines=reduced_log_lines, - bucket=bucket, - request_type=request_type, - excluded_ips=excluded_ips, - log_file_path=raw_s3_log_file_path, - index=index, - ip_hash_to_region=ip_address_to_region, - ) - per_buffer_index += index - - _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region) - - return reduced_log_lines - - -def parse_raw_s3_log( +def parse_all_dandi_raw_s3_logs( *, - raw_s3_log_file_path: str | pathlib.Path, + base_raw_s3_log_folder_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, mode: Literal["w", "a"] = "a", - bucket: str | None = None, - request_type: Literal["GET", "PUT"] = "GET", excluded_ips: collections.defaultdict[str, bool] | None = None, + exclude_github_ips: bool = True, number_of_jobs: int = 1, - total_memory_in_bytes: int = 1e9, - asset_id_handler: Callable | None = None, - tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> None: """ - Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. - 'Parsing' here means: - - limiting only to requests of the specified type (i.e., GET, PUT, etc.) - - reducing the information to the asset ID, request time, request size, and geographic IP of the requester + Assumes the following folder structure... + + |- + |-- 2019 (year) + |--- 01 (month) + |---- 01.log (day) + | ... Parameters ---------- - raw_s3_log_file_path : str or pathlib.Path - Path to the raw S3 log file. - parsed_s3_log_folder_path : str or pathlib.Path + base_raw_s3_log_folder_path : string or pathlib.Path + Path to the folder containing the raw S3 log files. + parsed_s3_log_folder_path : string or pathlib.Path Path to write each parsed S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. "w" will overwrite existing content, "a" will append or create if the file does not yet exist. - - The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate - over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. - HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. - bucket : str - Only parse and return lines that match this bucket. - request_type : str, default: "GET" - The type of request to filter for. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + exclude_github_ips : bool, default: True + Include all GitHub action IP addresses in the `excluded_ips`. number_of_jobs : int, default: 1 The number of jobs to use for parallel processing. Allows negative range to mean 'all but this many (minus one) jobs'. E.g., -1 means use all workers, -2 means all but one worker. WARNING: planned but not yet supported. - total_memory_in_bytes : int, default: 2e9 - The number of bytes to load as a buffer into RAM per job. - Will automatically distribute this amount over the number of jobs. - WARNING: planned but not yet supported. - asset_id_handler : callable, optional - If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to - translate into nested directory paths) then define a function of the following form: - - # For example - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] + maximum_ram_usage_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to be used across all the processes. """ - raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) + base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) parsed_s3_log_folder_path.mkdir(exist_ok=True) - excluded_ips = excluded_ips or collections.defaultdict(bool) - tqdm_kwargs = tqdm_kwargs or dict() - # TODO: buffering control - # total_file_size_in_bytes = raw_s3_log_file_path.lstat().st_size - # buffer_per_job_in_bytes = int(total_memory_in_bytes / number_of_jobs) - # Approximate using ~600 bytes per line - # number_of_lines_to_read_per_job = int(buffer_per_job_in_bytes / 600) - # number_of_iterations_per_job = int(total_file_size_in_bytes / number_of_lines_to_read_per_job) - - # TODO: finish polishing parallelization - just a draft for now - if number_of_jobs > 1: - raise NotImplementedError("Parallelization has not yet been implemented!") - # for _ in range(5) - # reduced_logs = _get_reduced_logs( - # raw_s3_log_file_path=raw_s3_log_file_path, - # lines_errors_file_path=lines_errors_file_path, - # bucket=bucket, - # request_type=request_type - # ) - else: - reduced_logs = _get_reduced_log_lines( - raw_s3_log_file_path=raw_s3_log_file_path, - bucket=bucket, - request_type=request_type, - excluded_ips=excluded_ips, - tqdm_kwargs=tqdm_kwargs, - ) + # Re-define some top-level pass-through items here to avoid repeated constructions + excluded_ips = excluded_ips or collections.defaultdict(bool) + if exclude_github_ips: + for github_ip in _get_latest_github_ip_ranges(): + excluded_ips[github_ip] = True - reduced_logs_binned_by_unparsed_asset = dict() - for reduced_log in reduced_logs: - raw_asset_id = reduced_log.asset_id - reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( - raw_asset_id, collections.defaultdict(list) - ) + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent) - reduced_logs_binned_by_unparsed_asset[raw_asset_id]["region"].append(reduced_log.region) + daily_raw_s3_log_file_paths = list() + base_folder_paths = [path for path in base_raw_s3_log_folder_path.iterdir() if path.stem.startswith("20")] + yearly_folder_paths = natsort.natsorted(seq=list(base_folder_paths)) + for yearly_folder_path in yearly_folder_paths: + monthly_folder_paths = natsort.natsorted(seq=list(yearly_folder_path.iterdir())) - if asset_id_handler is not None: - reduced_logs_binned_by_asset = dict() - for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items(): - parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id) + for monthly_folder_path in monthly_folder_paths: + daily_raw_s3_log_file_paths.extend(natsort.natsorted(seq=list(monthly_folder_path.glob("*.log")))) - reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset + if number_of_jobs == 1: + for raw_s3_log_file_path in tqdm.tqdm( + iterable=daily_raw_s3_log_file_paths, + desc="Parsing log files...", + position=0, + leave=True, + ): + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=parsed_s3_log_folder_path, + mode=mode, + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=1, leave=False), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + ) else: - reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset + # Create a fresh temporary directory in the home folder and then fresh subfolders for each job + temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" + temporary_base_folder_path.mkdir(exist_ok=True) + + # Clean up any previous tasks that failed to clean themselves up + for previous_task_folder_path in temporary_base_folder_path.iterdir(): + shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) + + task_id = uuid.uuid4()[:5] + temporary_folder_path = temporary_base_folder_path / task_id + temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths = list() + for job_index in range(number_of_jobs): + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + per_job_temporary_folder_path.mkdir(exist_ok=True) + per_job_temporary_folder_paths.append(per_job_temporary_folder_path) + + maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs + + futures = [] + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: + for raw_s3_log_file_path in daily_raw_s3_log_file_paths: + futures.append( + executor.submit( + _multi_job_parse_dandi_raw_s3_log, + number_of_jobs=number_of_jobs, + raw_s3_log_file_path=raw_s3_log_file_path, + temporary_folder_path=temporary_folder_path, + mode=mode, + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job, + ) + ) + + # Perform the iteration to trigger processing + for _ in tqdm.tqdm( + iterable=as_completed(daily_raw_s3_log_file_paths), + desc=f"Parsing log files using {number_of_jobs} jobs...", + total=len(daily_raw_s3_log_file_paths), + position=0, + leave=True, + ): + pass + + merged_temporary_folder_path = temporary_folder_path / "merged" + merged_temporary_folder_path.mkdir(exist_ok=True) + + print("\n\nParallel parsing complete!\n\n") + + for per_job_temporary_folder_path in tqdm.tqdm( + iterable=per_job_temporary_folder_paths, + desc="Merging results across jobs...", + total=len(per_job_temporary_folder_paths), + position=0, + leave=True, + ): + per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + for per_job_parsed_s3_log_file_path in tqdm.tqdm( + iterable=per_job_parsed_s3_log_file_paths, + desc="Merging results per job...", + total=len(per_job_parsed_s3_log_file_paths), + position=1, + leave=False, + mininterval=1.0, + ): + merged_temporary_file_path = merged_temporary_folder_path / per_job_parsed_s3_log_file_path.name + + parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path, index_col=0) + parsed_s3_log.to_csv(path_or_buf=merged_temporary_file_path, mode="a", sep="\t") + + order_parsed_logs( + unordered_parsed_s3_log_folder_path=merged_temporary_folder_path, + ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, + ) - for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): - parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" + shutil.rmtree(path=temporary_folder_path, ignore_errors=True) - data_frame = pandas.DataFrame(data=reduced_logs_per_asset) - data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t") + return None - progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress" - progress_folder_path.mkdir(exist_ok=True) - date = datetime.datetime.now().strftime("%y%m%d") - progress_file_path = progress_folder_path / f"{date}.txt" - with open(file=progress_file_path, mode="a") as io: - io.write(f"Parsed {raw_s3_log_file_path} successfully!\n") +def _multi_job_parse_dandi_raw_s3_log( + *, + number_of_jobs: int, + raw_s3_log_file_path: pathlib.Path, + temporary_folder_path: pathlib.Path, + mode: Literal["w", "a"], + excluded_ips: collections.defaultdict[str, bool] | None, + exclude_github_ips: bool, + asset_id_handler: Callable | None, + maximum_ram_usage_in_bytes: int, +) -> None: + """A mostly pass-through function to calculate the job index on the worker and target the correct subfolder.""" + job_index = os.getpid() % number_of_jobs + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=per_job_temporary_folder_path, + mode=mode, + excluded_ips=excluded_ips, + exclude_github_ips=exclude_github_ips, + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=job_index + 1, leave=False), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + ) + + return None def parse_dandi_raw_s3_log( @@ -211,6 +228,7 @@ def parse_dandi_raw_s3_log( exclude_github_ips: bool = True, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> None: """ Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. @@ -245,6 +263,10 @@ def parse_dandi_raw_s3_log( def asset_id_handler(*, raw_asset_id: str) -> str: split_by_slash = raw_asset_id.split("/") return split_by_slash[0] + "_" + split_by_slash[-1] + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_ram_usage_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ tqdm_kwargs = tqdm_kwargs or dict() @@ -273,78 +295,176 @@ def asset_id_handler(*, raw_asset_id: str) -> str: excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, tqdm_kwargs=tqdm_kwargs, + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, ) -def parse_all_dandi_raw_s3_logs( +def parse_raw_s3_log( *, - base_raw_s3_log_folder_path: str | pathlib.Path, + raw_s3_log_file_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, mode: Literal["w", "a"] = "a", + bucket: str | None = None, + request_type: Literal["GET", "PUT"] = "GET", excluded_ips: collections.defaultdict[str, bool] | None = None, - exclude_github_ips: bool = True, + asset_id_handler: Callable | None = None, + tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 4 * 10**9, ) -> None: """ - Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. - - Assumes the following folder structure... + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. - |- - |-- 2019 (year) - |--- 01 (month) - |---- 01.log (day) - | ... + 'Parsing' here means: + - limiting only to requests of the specified type (i.e., GET, PUT, etc.) + - reducing the information to the asset ID, request time, request size, and geographic IP of the requester Parameters ---------- - base_raw_s3_log_folder_path : string or pathlib.Path - Path to the folder containing the raw S3 log files. - parsed_s3_log_folder_path : string or pathlib.Path + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + parsed_s3_log_folder_path : str or pathlib.Path Path to write each parsed S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. "w" will overwrite existing content, "a" will append or create if the file does not yet exist. + + The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate + over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + HINT: If this iteration is done in chronological order, the resulting parsed logs will also maintain that order. + bucket : str + Only parse and return lines that match this bucket. + request_type : str, default: "GET" + The type of request to filter for. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - exclude_github_ips : bool, default: True - Include all GitHub action IP addresses in the `excluded_ips`. + asset_id_handler : callable, optional + If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to + translate into nested directory paths) then define a function of the following form: + + # For example + def asset_id_handler(*, raw_asset_id: str) -> str: + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_ram_usage_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. """ - base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) + raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) parsed_s3_log_folder_path.mkdir(exist_ok=True) - - # Re-define some top-level pass-through items here to avoid repeated constructions excluded_ips = excluded_ips or collections.defaultdict(bool) - if exclude_github_ips: - for github_ip in _get_latest_github_ip_ranges(): - excluded_ips[github_ip] = True + tqdm_kwargs = tqdm_kwargs or dict() - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] + reduced_logs = _get_reduced_log_lines( + raw_s3_log_file_path=raw_s3_log_file_path, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + tqdm_kwargs=tqdm_kwargs, + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + ) - daily_raw_s3_log_file_paths = list() - base_folder_paths = [path for path in base_raw_s3_log_folder_path.iterdir() if path.stem.startswith("20")] - yearly_folder_paths = natsort.natsorted(seq=list(base_folder_paths)) - for yearly_folder_path in yearly_folder_paths: - monthly_folder_paths = natsort.natsorted(seq=list(yearly_folder_path.iterdir())) + reduced_logs_binned_by_unparsed_asset = dict() + for reduced_log in reduced_logs: + raw_asset_id = reduced_log.asset_id + reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( + raw_asset_id, collections.defaultdict(list) + ) - for monthly_folder_path in monthly_folder_paths: - daily_raw_s3_log_file_paths.extend(natsort.natsorted(seq=list(monthly_folder_path.glob("*.log")))) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["bytes_sent"].append(reduced_log.bytes_sent) + reduced_logs_binned_by_unparsed_asset[raw_asset_id]["region"].append(reduced_log.region) - for raw_s3_log_file_path in tqdm.tqdm( - iterable=daily_raw_s3_log_file_paths, - desc="Parsing log files...", - position=0, - leave=True, - ): - parse_dandi_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=parsed_s3_log_folder_path, - mode=mode, - excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction - asset_id_handler=asset_id_handler, - tqdm_kwargs=dict(position=1, leave=False), - ) + if asset_id_handler is not None: + reduced_logs_binned_by_asset = dict() + for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_unparsed_asset.items(): + parsed_asset_id = asset_id_handler(raw_asset_id=raw_asset_id) + + reduced_logs_binned_by_asset[parsed_asset_id] = reduced_logs_per_asset + else: + reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset + + for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): + parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" + + data_frame = pandas.DataFrame(data=reduced_logs_per_asset) + data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t") + + progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress" + progress_folder_path.mkdir(exist_ok=True) + + date = datetime.datetime.now().strftime("%y%m%d") + progress_file_path = progress_folder_path / f"{date}.txt" + with open(file=progress_file_path, mode="a") as io: + io.write(f"Parsed {raw_s3_log_file_path} successfully!\n") + + return None + + +def _get_reduced_log_lines( + *, + raw_s3_log_file_path: pathlib.Path, + bucket: str | None, + request_type: Literal["GET", "PUT"], + excluded_ips: collections.defaultdict[str, bool], + tqdm_kwargs: dict | None = None, + maximum_ram_usage_in_bytes: int = 4 * 10**9, +) -> list[ReducedLogLine]: + """ + Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. + + Parameters + ---------- + raw_s3_log_file_path : str or pathlib.Path + Path to the raw S3 log file. + bucket : str + Only parse and return lines that match this bucket. + request_type : str + The type of request to filter for. + excluded_ips : collections.defaultdict of strings to booleans + A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + tqdm_kwargs : dict, optional + Keyword arguments to pass to the tqdm progress bar. + maximum_ram_usage_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + """ + assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!" + + # Collapse bucket to empty string instead of asking if it is None on each iteration + bucket = "" if bucket is None else bucket + tqdm_kwargs = tqdm_kwargs or dict() + + # One-time initialization/read of IP address to region cache for performance + # This dictionary is intended to be mutated throughout the process + ip_address_to_region = _load_ip_address_to_region_cache() + + # Perform I/O read in batches to improve performance + resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=1.0) + resolved_tqdm_kwargs.update(tqdm_kwargs) + + reduced_log_lines = list() + per_buffer_index = 0 + buffered_text_reader = BufferedTextReader( + file_path=raw_s3_log_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes + ) + for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): + index = 0 + for raw_line in buffered_raw_lines: + _append_reduced_log_line( + raw_line=raw_line, + reduced_log_lines=reduced_log_lines, + bucket=bucket, + request_type=request_type, + excluded_ips=excluded_ips, + log_file_path=raw_s3_log_file_path, + index=index, + ip_hash_to_region=ip_address_to_region, + ) + index += 1 + per_buffer_index += index + + _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region) + + return reduced_log_lines diff --git a/tests/examples/example_0/example_dandi_s3_log.log b/tests/examples/ordered_example_0/example_dandi_s3_log.log similarity index 100% rename from tests/examples/example_0/example_dandi_s3_log.log rename to tests/examples/ordered_example_0/example_dandi_s3_log.log diff --git a/tests/examples/example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/ordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv similarity index 100% rename from tests/examples/example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv rename to tests/examples/ordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv diff --git a/tests/examples/example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/ordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv similarity index 100% rename from tests/examples/example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv rename to tests/examples/ordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv diff --git a/tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv new file mode 100644 index 0000000..eed1bdf --- /dev/null +++ b/tests/examples/unordered_example_0/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2020-02-24 05:06:35 124 CA/Ontario +1 2021-05-21 05:06:35 1234 US/California +2 2022-07-01 05:06:35 512 unknown +3 2022-11-04 05:06:35 141424 US/Virginia diff --git a/tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..f7d6a57 --- /dev/null +++ b/tests/examples/unordered_example_0/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2019-11-13 05:06:35 4332423 US/Colorado +1 2020-04-16 05:06:35 12313153 unknown +2 2022-03-21 05:06:35 24323 MX/Zacatecas +3 2022-08-05 05:06:35 2141 CH/BeiJing diff --git a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv new file mode 100644 index 0000000..931a755 --- /dev/null +++ b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2022-07-01 05:06:35 512 unknown +1 2021-05-21 05:06:35 1234 US/California +2 2022-11-04 05:06:35 141424 US/Virginia +3 2020-02-24 05:06:35 124 CA/Ontario diff --git a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..8e0ac58 --- /dev/null +++ b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,5 @@ + timestamp bytes_sent region +0 2022-08-05 05:06:35 2141 CH/BeiJing +1 2022-03-21 05:06:35 24323 MX/Zacatecas +2 2019-11-13 05:06:35 4332423 US/Colorado +3 2020-04-16 05:06:35 12313153 unknown diff --git a/tests/test_buffered_text_reader.py b/tests/test_buffered_text_reader.py index 6ffaa95..6fae0f8 100644 --- a/tests/test_buffered_text_reader.py +++ b/tests/test_buffered_text_reader.py @@ -35,9 +35,9 @@ def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory): def test_buffered_text_reader(large_text_file_path: pathlib.Path): - maximum_ram_usage = 10**6 # 1 MB + maximum_ram_usage_in_bytes = 10**6 # 1 MB buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=large_text_file_path, maximum_ram_usage=maximum_ram_usage + file_path=large_text_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes ) assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!" @@ -55,10 +55,10 @@ def test_buffered_text_reader(large_text_file_path: pathlib.Path): def test_value_error(single_line_text_file_path: pathlib.Path): - maximum_ram_usage = 10**6 # 1 MB + maximum_ram_usage_in_bytes = 10**6 # 1 MB with pytest.raises(ValueError) as error_info: buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=single_line_text_file_path, maximum_ram_usage=maximum_ram_usage + file_path=single_line_text_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes ) next(buffered_text_reader) diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 4ba1983..6bb8e32 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -16,7 +16,7 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): tmpdir = pathlib.Path(tmpdir) file_parent = pathlib.Path(__file__).parent - examples_folder_path = file_parent / "examples" / "example_0" + examples_folder_path = file_parent / "examples" / "ordered_example_0" example_raw_s3_log_file_path = examples_folder_path / "example_dandi_s3_log.log" expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" @@ -41,11 +41,86 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): test_parsed_s3_log_file_path.stem in expected_asset_ids ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" - test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) expected_parsed_s3_log_file_path = ( expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" ) - expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) + + +def parse_all_dandi_raw_s3_logs_example_0(tmpdir: py.path.local): + tmpdir = pathlib.Path(tmpdir) + + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "ordered_example_0" + expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" + + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( + base_raw_s3_log_folder_path=examples_folder_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + ) + test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir()) + + number_of_output_files = len(test_output_file_paths) + assert number_of_output_files != 0, f"Test expected_output folder ({test_parsed_s3_log_folder_path}) is empty!" + + # Increment this over time as more examples are added + expected_number_of_output_files = 2 + assert ( + number_of_output_files == expected_number_of_output_files + ), f"The number of asset files ({number_of_output_files}) does not match expectation!" + + expected_asset_ids = [file_path.stem for file_path in expected_parsed_s3_log_folder_path.iterdir()] + for test_parsed_s3_log_file_path in test_output_file_paths: + assert ( + test_parsed_s3_log_file_path.stem in expected_asset_ids + ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log_file_path = ( + expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" + ) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) + + +def parse_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local): + tmpdir = pathlib.Path(tmpdir) + + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "ordered_example_0" + expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" + + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( + base_raw_s3_log_folder_path=examples_folder_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + number_of_jobs=2, + ) + test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir()) + + number_of_output_files = len(test_output_file_paths) + assert number_of_output_files != 0, f"Test expected_output folder ({test_parsed_s3_log_folder_path}) is empty!" + + # Increment this over time as more examples are added + expected_number_of_output_files = 2 + assert ( + number_of_output_files == expected_number_of_output_files + ), f"The number of asset files ({number_of_output_files}) does not match expectation!" + + expected_asset_ids = [file_path.stem for file_path in expected_parsed_s3_log_folder_path.iterdir()] + for test_parsed_s3_log_file_path in test_output_file_paths: + assert ( + test_parsed_s3_log_file_path.stem in expected_asset_ids + ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log_file_path = ( + expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" + ) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) diff --git a/tests/test_order_parsed_logs.py b/tests/test_order_parsed_logs.py new file mode 100644 index 0000000..b32ee88 --- /dev/null +++ b/tests/test_order_parsed_logs.py @@ -0,0 +1,33 @@ +import pathlib +import py +import pandas + +import dandi_s3_log_parser + + +def test_output_file_reordering(tmpdir: py.path.local) -> None: + """ + Performing parallelized parsing can result in both race conditions and a break to chronological ordering. + + This is a test for the utility function for reordering the output of a parsed log file according to time. + """ + tmpdir = pathlib.Path(tmpdir) + + unordered_example_base_folder_path = pathlib.Path(__file__).parent / "examples" / "unordered_example_0" + unordered_parsed_s3_log_folder_path = unordered_example_base_folder_path / "unordered_parsed_logs" + ordered_parsed_s3_log_folder_path = tmpdir + + dandi_s3_log_parser.order_parsed_logs( + unordered_parsed_s3_log_folder_path=unordered_parsed_s3_log_folder_path, + ordered_parsed_s3_log_folder_path=ordered_parsed_s3_log_folder_path, + ) + + parsed_log_file_stems = [path.name for path in unordered_parsed_s3_log_folder_path.iterdir()] + expected_output_folder_path = unordered_example_base_folder_path / "expected_output" + for parsed_log_file_name in parsed_log_file_stems: + test_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / parsed_log_file_name + expected_parsed_s3_log_file_path = expected_output_folder_path / parsed_log_file_name + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log)