From e097e0df7b143b8a929697ad946286d3447ae7a5 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 9 Aug 2024 23:44:02 -0400 Subject: [PATCH] various debugs and enhancements; more tests --- .../_command_line_interface.py | 16 ++++ .../_s3_log_file_parser.py | 81 ++++++++++++------- .../_s3_log_line_parser.py | 4 +- 3 files changed, 68 insertions(+), 33 deletions(-) diff --git a/src/dandi_s3_log_parser/_command_line_interface.py b/src/dandi_s3_log_parser/_command_line_interface.py index e9b0416..d62a8d1 100644 --- a/src/dandi_s3_log_parser/_command_line_interface.py +++ b/src/dandi_s3_log_parser/_command_line_interface.py @@ -1,6 +1,7 @@ """Call the raw S3 log parser from the command line.""" import collections +import os import pathlib import click from typing import Literal @@ -9,6 +10,8 @@ from .testing._helpers import find_random_example_line from ._config import REQUEST_TYPES +NUMBER_OF_CPU = os.cpu_count() # Note: Not distinguishing if logical or not + @click.command(name="parse_all_dandi_raw_s3_logs") @click.option( @@ -45,12 +48,24 @@ type=str, default=None, ) +@click.option( + "--number_of_jobs", + help="The number of jobs to use for parallel processing.", + required=False, + type=int, + default=1, +) def parse_all_dandi_raw_s3_logs_cli( base_raw_s3_log_folder_path: str, parsed_s3_log_folder_path: str, mode: Literal["w", "a"] = "a", excluded_ips: str | None = None, + number_of_jobs: int = 1, ) -> None: + number_of_jobs = NUMBER_OF_CPU + number_of_jobs + 1 if number_of_jobs < 0 else number_of_jobs + assert number_of_jobs > 0, "The number of jobs must be greater than 0." + assert number_of_jobs <= NUMBER_OF_CPU, "The number of jobs must be less than or equal to the number of CPUs." + split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else [] handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None for excluded_ip in split_excluded_ips: @@ -61,6 +76,7 @@ def parse_all_dandi_raw_s3_logs_cli( parsed_s3_log_folder_path=parsed_s3_log_folder_path, mode=mode, excluded_ips=handled_excluded_ips, + number_of_jobs=number_of_jobs, ) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 24180ee..d78bf47 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -5,9 +5,11 @@ import pathlib import os import shutil +import traceback import uuid from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal +import importlib.metadata import pandas import tqdm @@ -130,21 +132,19 @@ def asset_id_handler(*, raw_asset_id: str) -> str: raw_s3_log_file_path=raw_s3_log_file_path, temporary_folder_path=temporary_folder_path, excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction - asset_id_handler=asset_id_handler, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job, ) ) - # Perform the iteration to trigger processing - for _ in tqdm.tqdm( + progress_bar_iterable = tqdm.tqdm( iterable=as_completed(futures), desc=f"Parsing log files using {number_of_jobs} jobs...", total=len(daily_raw_s3_log_file_paths), position=0, leave=True, - ): - pass + ) + for future in progress_bar_iterable: + future.result() # This is the call that finally triggers the deployment to the workers print("\n\nParallel parsing complete!\n\n") @@ -156,6 +156,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: leave=True, ): per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + assert len(per_job_parsed_s3_log_file_paths) != 0, f"No files found in {per_job_temporary_folder_path}!" + for per_job_parsed_s3_log_file_path in tqdm.tqdm( iterable=per_job_parsed_s3_log_file_paths, desc="Merging results per job...", @@ -192,25 +194,51 @@ def _multi_job_parse_dandi_raw_s3_log( raw_s3_log_file_path: pathlib.Path, temporary_folder_path: pathlib.Path, excluded_ips: collections.defaultdict[str, bool] | None, - exclude_github_ips: bool, - asset_id_handler: Callable | None, maximum_ram_usage_in_bytes: int, ) -> None: - """A mostly pass-through function to calculate the job index on the worker and target the correct subfolder.""" - job_index = os.getpid() % number_of_jobs - per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + """ + A mostly pass-through function to calculate the job index on the worker and target the correct subfolder. - parse_dandi_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=per_job_temporary_folder_path, - mode="a", - excluded_ips=excluded_ips, - exclude_github_ips=exclude_github_ips, - asset_id_handler=asset_id_handler, - tqdm_kwargs=dict(position=job_index + 1, leave=False), - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, - order_results=False, # Always disable this for parallel processing - ) + Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe) + to a log file. + """ + + try: + error_message = "" + + def asset_id_handler(*, raw_asset_id: str) -> str: + """Apparently callables, even simple built-in ones, cannot be pickled.""" + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + job_index = os.getpid() % number_of_jobs + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + + # Define error catching stuff as part of the try clause + # so that if there is a problem within that, it too is caught + errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" + errors_folder_path.mkdir(exist_ok=True) + + dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") + date = datetime.datetime.now().strftime("%y%m%d") + parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt" + error_message += f"Job index {job_index}/{number_of_jobs} parsing {raw_s3_log_file_path} failed due to\n\n" + + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=per_job_temporary_folder_path, + mode="a", + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=job_index + 1, leave=False), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + order_results=False, # Always disable this for parallel processing + ) + except Exception as exception: + with open(file=parallel_errors_file_path, mode="a") as io: + error_message += f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}\n\n" + io.write(error_message) return None @@ -419,15 +447,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str: header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False) - # Keep a log of progress - progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress" - progress_folder_path.mkdir(exist_ok=True) - - date = datetime.datetime.now().strftime("%y%m%d") - progress_file_path = progress_folder_path / f"{date}.txt" - with open(file=progress_file_path, mode="a") as io: - io.write(f"Parsed {raw_s3_log_file_path} successfully!\n") - if order_results is True: order_parsed_logs( unordered_parsed_s3_log_folder_path=temporary_output_folder_path, diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index 57178f1..b752d13 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -18,7 +18,7 @@ import datetime import pathlib import re -from importlib.metadata import version as importlib_version +import importlib.metadata from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH from ._ip_utils import _get_region_from_ip_address @@ -104,7 +104,7 @@ def _get_full_log_line( errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" errors_folder_path.mkdir(exist_ok=True) - dandi_s3_log_parser_version = importlib_version(distribution_name="dandi_s3_log_parser") + dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") date = datetime.datetime.now().strftime("%y%m%d") lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_lines_errors.txt"