From 039c6502d91aee1b177f9b5711d2043cafca080f Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Fri, 9 Aug 2024 12:23:47 -0400 Subject: [PATCH] finalize logical structure; add cleanup; add test --- .../_s3_log_file_parser.py | 12 ++++- tests/test_dandi_s3_log_parser.py | 45 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 543ec2e..80423a5 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -4,6 +4,7 @@ import datetime import pathlib import os +import shutil import uuid from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal @@ -110,6 +111,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: # Create a fresh temporary directory in the home folder and then fresh subfolders for each job temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" temporary_base_folder_path.mkdir(exist_ok=True) + + # Clean up any previous tasks that failed to clean themselves up + for previous_task_folder_path in temporary_base_folder_path.iterdir(): + shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) + task_id = uuid.uuid4()[:5] temporary_folder_path = temporary_base_folder_path / task_id temporary_folder_path.mkdir(exist_ok=True) @@ -179,6 +185,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, ) + shutil.rmtree(path=temporary_folder_path, ignore_errors=True) + return None @@ -442,7 +450,8 @@ def _get_reduced_log_lines( file_path=raw_s3_log_file_path, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes ) for buffered_raw_lines in tqdm.tqdm(iterable=buffered_text_reader, **resolved_tqdm_kwargs): - for index, raw_line in enumerate(iterable=buffered_raw_lines, start=per_buffer_index): + index = 0 + for raw_line in buffered_raw_lines: _append_reduced_log_line( raw_line=raw_line, reduced_log_lines=reduced_log_lines, @@ -453,6 +462,7 @@ def _get_reduced_log_lines( index=index, ip_hash_to_region=ip_address_to_region, ) + index += 1 per_buffer_index += index _save_ip_address_to_region_cache(ip_hash_to_region=ip_address_to_region) diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 0d7e4d0..6baabdf 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -49,4 +49,49 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) +def test_parse_dandi_raw_s3_log_example_0_parallel(tmpdir: py.path.local): + """ + Most basic test of functionality. + + If there are failures in the parsing of any lines found in application, + please raise an issue and contribute them to the example log collection. + """ + tmpdir = pathlib.Path(tmpdir) + + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "ordered_example_0" + example_raw_s3_log_file_path = examples_folder_path / "example_dandi_s3_log.log" + expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" + + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + dandi_s3_log_parser.parse_dandi_raw_s3_log( + raw_s3_log_file_path=example_raw_s3_log_file_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + number_of_jobs=2, + ) + test_output_file_paths = list(test_parsed_s3_log_folder_path.iterdir()) + + number_of_output_files = len(test_output_file_paths) + assert number_of_output_files != 0, f"Test expected_output folder ({test_parsed_s3_log_folder_path}) is empty!" + + # Increment this over time as more examples are added + expected_number_of_output_files = 2 + assert ( + number_of_output_files == expected_number_of_output_files + ), f"The number of asset files ({number_of_output_files}) does not match expectation!" + + expected_asset_ids = [file_path.stem for file_path in expected_parsed_s3_log_folder_path.iterdir()] + for test_parsed_s3_log_file_path in test_output_file_paths: + assert ( + test_parsed_s3_log_file_path.stem in expected_asset_ids + ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" + + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log_file_path = ( + expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" + ) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) + + # TODO: add tests for API and CLI usage of finding random example line from testing submodule