diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py index d4bec6a..d8dc64d 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py @@ -46,7 +46,7 @@ def reduce_all_dandi_raw_s3_logs( base_raw_s3_logs_folder_path : file path The Path to the folder containing the raw S3 log files to be reduced. reduced_s3_logs_folder_path : file path - The Path to write each parsed S3 log file to. + The Path to write each reduced S3 log file to. There will be one file per handled asset ID. maximum_number_of_workers : int, default: 1 The maximum number of workers to distribute tasks across. @@ -61,8 +61,6 @@ def reduce_all_dandi_raw_s3_logs( excluded_ips : collections.defaultdict(bool), optional A lookup table whose keys are IP addresses to exclude from reduction. """ - base_raw_s3_logs_folder_path = pathlib.Path(base_raw_s3_logs_folder_path) - reduced_s3_logs_folder_path = pathlib.Path(reduced_s3_logs_folder_path) excluded_ips = excluded_ips or collections.defaultdict(bool) asset_id_handler = _get_default_dandi_asset_id_handler() @@ -119,9 +117,9 @@ def reduce_all_dandi_raw_s3_logs( futures.append( executor.submit( _multi_worker_reduce_dandi_raw_s3_log, - maximum_number_of_workers=maximum_number_of_workers, raw_s3_log_file_path=raw_s3_log_file_path, temporary_folder_path=temporary_folder_path, + maximum_number_of_workers=maximum_number_of_workers, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_worker, excluded_ips=excluded_ips, ), @@ -149,22 +147,22 @@ def reduce_all_dandi_raw_s3_logs( leave=True, mininterval=2.0, ): - per_worker_parsed_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir()) + per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir()) assert ( - len(per_worker_parsed_s3_log_file_paths) != 0 + len(per_worker_reduced_s3_log_file_paths) != 0 ), f"No files found in {per_worker_temporary_folder_path}!" - for per_worker_parsed_s3_log_file_path in tqdm.tqdm( - iterable=per_worker_parsed_s3_log_file_paths, + for per_worker_reduced_s3_log_file_path in tqdm.tqdm( + iterable=per_worker_reduced_s3_log_file_paths, desc="Merging results per worker...", - total=len(per_worker_parsed_s3_log_file_paths), + total=len(per_worker_reduced_s3_log_file_paths), position=1, leave=False, mininterval=2.0, ): - merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_parsed_s3_log_file_path.name + merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.name - parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_parsed_s3_log_file_path, header=0) + parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_reduced_s3_log_file_path, header=0) header = False if merged_temporary_file_path.exists() else True parsed_s3_log.to_csv( @@ -182,11 +180,11 @@ def reduce_all_dandi_raw_s3_logs( # pragma: no cover def _multi_worker_reduce_dandi_raw_s3_log( *, - maximum_number_of_workers: int, raw_s3_log_file_path: pathlib.Path, temporary_folder_path: pathlib.Path, - excluded_ips: collections.defaultdict[str, bool] | None, + maximum_number_of_workers: int, maximum_buffer_size_in_bytes: int, + excluded_ips: collections.defaultdict[str, bool] | None, ) -> None: """ A mostly pass-through function to calculate the worker index on the worker and target the correct subfolder. @@ -211,7 +209,7 @@ def _multi_worker_reduce_dandi_raw_s3_log( date = datetime.datetime.now().strftime("%y%m%d") parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt" error_message += ( - f"Worker index {worker_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n" + f"Worker index {worker_index}/{maximum_number_of_workers} reducing {raw_s3_log_file_path} failed due to\n\n" ) reduce_dandi_raw_s3_log( @@ -243,7 +241,7 @@ def reduce_dandi_raw_s3_log( tqdm_kwargs: dict | None = None, ) -> None: """ - Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + Reduce a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. 'Parsing' here means: - limiting only to requests of the specified type (i.e., GET, PUT, etc.) @@ -254,7 +252,7 @@ def reduce_dandi_raw_s3_log( raw_s3_log_file_path : string or pathlib.Path Path to the raw S3 log file to be reduced. reduced_s3_logs_folder_path : string or pathlib.Path - The path to write each parsed S3 log file to. + The path to write each reduced S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. @@ -290,7 +288,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: reduce_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=reduced_s3_logs_folder_path, + reduced_s3_logs_folder_path=reduced_s3_logs_folder_path, mode=mode, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, bucket=bucket, diff --git a/src/dandi_s3_log_parser/_s3_log_file_reducer.py b/src/dandi_s3_log_parser/_s3_log_file_reducer.py index 092f1d5..4ee98a1 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_reducer.py +++ b/src/dandi_s3_log_parser/_s3_log_file_reducer.py @@ -10,7 +10,7 @@ import pandas import tqdm -from pydantic import FilePath, validate_call +from pydantic import DirectoryPath, FilePath, validate_call from ._buffered_text_reader import BufferedTextReader from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH @@ -21,7 +21,7 @@ def reduce_raw_s3_log( *, raw_s3_log_file_path: FilePath, - parsed_s3_log_folder_path: FilePath, + reduced_s3_logs_folder_path: DirectoryPath, mode: Literal["w", "a"] = "a", maximum_buffer_size_in_bytes: int = 4 * 10**9, bucket: str | None = None, @@ -44,8 +44,8 @@ def reduce_raw_s3_log( ---------- raw_s3_log_file_path : str or pathlib.Path The path to the raw S3 log file. - parsed_s3_log_folder_path : str or pathlib.Path - The path to write each parsed S3 log file to. + reduced_s3_log_folder_path : str or pathlib.Path + The path to write each reduced S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. @@ -75,9 +75,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: tqdm_kwargs : dict, optional Keyword arguments to pass to the tqdm progress bar for line buffers. """ - raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) - parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) - parsed_s3_log_folder_path.mkdir(exist_ok=True) + reduced_s3_logs_folder_path.mkdir(exist_ok=True) bucket = bucket or "" excluded_ips = excluded_ips or collections.defaultdict(bool) asset_id_handler = asset_id_handler or (lambda asset_id: asset_id) @@ -96,7 +94,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: ) for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items(): - parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{handled_asset_id}.tsv" + parsed_s3_log_file_path = reduced_s3_logs_folder_path / f"{handled_asset_id}.tsv" data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id) diff --git a/tests/test_reduce_dandi_raw_s3_log_bad_lines.py b/tests/test_reduce_dandi_raw_s3_log_bad_lines.py index 8e1aeab..92a866d 100644 --- a/tests/test_reduce_dandi_raw_s3_log_bad_lines.py +++ b/tests/test_reduce_dandi_raw_s3_log_bad_lines.py @@ -22,12 +22,14 @@ def test_reduce_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local) -> None: example_raw_s3_log_file_path = examples_folder_path / "0.log" expected_reduced_s3_logs_folder_path = examples_folder_path / "expected_output" - test_reduced_s3_log_folder_path = tmpdir / "reduced_example_2" + test_reduced_s3_logs_folder_path = tmpdir / "reduced_example_2" + test_reduced_s3_logs_folder_path.mkdir(exist_ok=True) + dandi_s3_log_parser.reduce_dandi_raw_s3_log( raw_s3_log_file_path=example_raw_s3_log_file_path, - reduced_s3_logs_folder_path=test_reduced_s3_log_folder_path, + reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path, ) - test_output_file_paths = list(test_reduced_s3_log_folder_path.iterdir()) + test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir()) number_of_output_files = len(test_output_file_paths) expected_number_of_output_files = 3