From dc9fd24d35eb51c920d791644d04da5256ca22cd Mon Sep 17 00:00:00 2001 From: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Date: Fri, 9 Aug 2024 23:51:20 -0400 Subject: [PATCH] Various debugs and enhancements (#22) * various debugs and enhancements; more tests * various debugs and enhancements; more tests --------- Co-authored-by: CodyCBakerPhD --- .../_command_line_interface.py | 16 ++ src/dandi_s3_log_parser/_ip_utils.py | 5 +- src/dandi_s3_log_parser/_order_parsed_logs.py | 4 +- .../_s3_log_file_parser.py | 195 +++++++++++------- .../_s3_log_line_parser.py | 6 +- .../example_dandi_s3_log_0.log | 2 + .../example_dandi_s3_log_1.log | 2 + ...s_11ec8933-1456-4942-922b-94e5878bb991.tsv | 3 + ...s_a7b032b8-1e31-429f-975f-52a28cec6629.tsv | 3 + ...s_11ec8933-1456-4942-922b-94e5878bb991.tsv | 10 +- ...s_a7b032b8-1e31-429f-975f-52a28cec6629.tsv | 10 +- tests/test_dandi_s3_log_parser.py | 12 +- 12 files changed, 176 insertions(+), 92 deletions(-) create mode 100644 tests/examples/ordered_example_1/example_dandi_s3_log_0.log create mode 100644 tests/examples/ordered_example_1/example_dandi_s3_log_1.log create mode 100644 tests/examples/ordered_example_1/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv create mode 100644 tests/examples/ordered_example_1/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv diff --git a/src/dandi_s3_log_parser/_command_line_interface.py b/src/dandi_s3_log_parser/_command_line_interface.py index e9b0416..d62a8d1 100644 --- a/src/dandi_s3_log_parser/_command_line_interface.py +++ b/src/dandi_s3_log_parser/_command_line_interface.py @@ -1,6 +1,7 @@ """Call the raw S3 log parser from the command line.""" import collections +import os import pathlib import click from typing import Literal @@ -9,6 +10,8 @@ from .testing._helpers import find_random_example_line from ._config import REQUEST_TYPES +NUMBER_OF_CPU = os.cpu_count() # Note: Not distinguishing if logical or not + @click.command(name="parse_all_dandi_raw_s3_logs") @click.option( @@ -45,12 +48,24 @@ type=str, default=None, ) +@click.option( + "--number_of_jobs", + help="The number of jobs to use for parallel processing.", + required=False, + type=int, + default=1, +) def parse_all_dandi_raw_s3_logs_cli( base_raw_s3_log_folder_path: str, parsed_s3_log_folder_path: str, mode: Literal["w", "a"] = "a", excluded_ips: str | None = None, + number_of_jobs: int = 1, ) -> None: + number_of_jobs = NUMBER_OF_CPU + number_of_jobs + 1 if number_of_jobs < 0 else number_of_jobs + assert number_of_jobs > 0, "The number of jobs must be greater than 0." + assert number_of_jobs <= NUMBER_OF_CPU, "The number of jobs must be less than or equal to the number of CPUs." + split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else [] handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None for excluded_ip in split_excluded_ips: @@ -61,6 +76,7 @@ def parse_all_dandi_raw_s3_logs_cli( parsed_s3_log_folder_path=parsed_s3_log_folder_path, mode=mode, excluded_ips=handled_excluded_ips, + number_of_jobs=number_of_jobs, ) diff --git a/src/dandi_s3_log_parser/_ip_utils.py b/src/dandi_s3_log_parser/_ip_utils.py index 0ae8500..4ced1a0 100644 --- a/src/dandi_s3_log_parser/_ip_utils.py +++ b/src/dandi_s3_log_parser/_ip_utils.py @@ -65,7 +65,7 @@ def _save_ip_address_to_region_cache(ip_hash_to_region: dict[str, str]) -> None: yaml.dump(data=ip_hash_to_region, stream=stream) -def _get_region_from_ip_address(ip_hash_to_region: dict[str, str], ip_address: str) -> str: +def _get_region_from_ip_address(ip_hash_to_region: dict[str, str], ip_address: str) -> str | None: """ If the parsed S3 logs are meant to be shared openly, the remote IP could be used to directly identify individuals. @@ -99,6 +99,9 @@ def _get_region_from_ip_address(ip_hash_to_region: dict[str, str], ip_address: s ip_hash_to_region[ip_hash] = region_string return region_string + except ipinfo.exceptions.RequestQuotaExceededError: + # Return the generic 'unknown' but do not cache + return "unknown" except Exception as exception: errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" errors_folder_path.mkdir(exist_ok=True) diff --git a/src/dandi_s3_log_parser/_order_parsed_logs.py b/src/dandi_s3_log_parser/_order_parsed_logs.py index 75fe932..406c63c 100644 --- a/src/dandi_s3_log_parser/_order_parsed_logs.py +++ b/src/dandi_s3_log_parser/_order_parsed_logs.py @@ -9,11 +9,11 @@ def order_parsed_logs( ordered_parsed_s3_log_folder_path.mkdir(exist_ok=True) for unordered_parsed_s3_log_file_path in unordered_parsed_s3_log_folder_path.iterdir(): - unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, index_col=0) + unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, header=0) ordered_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp") # correct index of first column ordered_parsed_s3_log.index = range(len(ordered_parsed_s3_log)) ordered_parsed_s3_log_file_path = ordered_parsed_s3_log_folder_path / unordered_parsed_s3_log_file_path.name - ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t") + ordered_parsed_s3_log.to_csv(path_or_buf=ordered_parsed_s3_log_file_path, sep="\t", header=True, index=True) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index c31e8cf..d78bf47 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -5,13 +5,14 @@ import pathlib import os import shutil +import traceback import uuid from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable, Literal +import importlib.metadata import pandas import tqdm -import natsort from ._ip_utils import ( _get_latest_github_ip_ranges, @@ -28,7 +29,6 @@ def parse_all_dandi_raw_s3_logs( *, base_raw_s3_log_folder_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, - mode: Literal["w", "a"] = "a", excluded_ips: collections.defaultdict[str, bool] | None = None, exclude_github_ips: bool = True, number_of_jobs: int = 1, @@ -52,9 +52,6 @@ def parse_all_dandi_raw_s3_logs( parsed_s3_log_folder_path : string or pathlib.Path Path to write each parsed S3 log file to. There will be one file per handled asset ID. - mode : "w" or "a", default: "a" - How to resolve the case when files already exist in the folder containing parsed logs. - "w" will overwrite existing content, "a" will append or create if the file does not yet exist. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. exclude_github_ips : bool, default: True @@ -71,6 +68,21 @@ def parse_all_dandi_raw_s3_logs( parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) parsed_s3_log_folder_path.mkdir(exist_ok=True) + # Create a fresh temporary directory in the home folder and then fresh subfolders for each job + temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" + temporary_base_folder_path.mkdir(exist_ok=True) + + # Clean up any previous tasks that failed to clean themselves up + for previous_task_folder_path in temporary_base_folder_path.iterdir(): + shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) + + task_id = str(uuid.uuid4())[:5] + temporary_folder_path = temporary_base_folder_path / task_id + temporary_folder_path.mkdir(exist_ok=True) + + temporary_output_folder_path = temporary_folder_path / "output" + temporary_output_folder_path.mkdir(exist_ok=True) + # Re-define some top-level pass-through items here to avoid repeated constructions excluded_ips = excluded_ips or collections.defaultdict(bool) if exclude_github_ips: @@ -81,14 +93,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: split_by_slash = raw_asset_id.split("/") return split_by_slash[0] + "_" + split_by_slash[-1] - daily_raw_s3_log_file_paths = list() - base_folder_paths = [path for path in base_raw_s3_log_folder_path.iterdir() if path.stem.startswith("20")] - yearly_folder_paths = natsort.natsorted(seq=list(base_folder_paths)) - for yearly_folder_path in yearly_folder_paths: - monthly_folder_paths = natsort.natsorted(seq=list(yearly_folder_path.iterdir())) - - for monthly_folder_path in monthly_folder_paths: - daily_raw_s3_log_file_paths.extend(natsort.natsorted(seq=list(monthly_folder_path.glob("*.log")))) + daily_raw_s3_log_file_paths = list(base_raw_s3_log_folder_path.rglob(pattern="*.log")) if number_of_jobs == 1: for raw_s3_log_file_path in tqdm.tqdm( @@ -99,26 +104,16 @@ def asset_id_handler(*, raw_asset_id: str) -> str: ): parse_dandi_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=parsed_s3_log_folder_path, - mode=mode, + parsed_s3_log_folder_path=temporary_output_folder_path, + mode="a", excluded_ips=excluded_ips, exclude_github_ips=False, # Already included in list so avoid repeated construction asset_id_handler=asset_id_handler, tqdm_kwargs=dict(position=1, leave=False), maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + order_results=False, # Will immediately reorder all files at the end ) else: - # Create a fresh temporary directory in the home folder and then fresh subfolders for each job - temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" - temporary_base_folder_path.mkdir(exist_ok=True) - - # Clean up any previous tasks that failed to clean themselves up - for previous_task_folder_path in temporary_base_folder_path.iterdir(): - shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) - - task_id = uuid.uuid4()[:5] - temporary_folder_path = temporary_base_folder_path / task_id - temporary_folder_path.mkdir(exist_ok=True) per_job_temporary_folder_paths = list() for job_index in range(number_of_jobs): per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" @@ -136,26 +131,20 @@ def asset_id_handler(*, raw_asset_id: str) -> str: number_of_jobs=number_of_jobs, raw_s3_log_file_path=raw_s3_log_file_path, temporary_folder_path=temporary_folder_path, - mode=mode, excluded_ips=excluded_ips, - exclude_github_ips=False, # Already included in list so avoid repeated construction - asset_id_handler=asset_id_handler, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job, ) ) - # Perform the iteration to trigger processing - for _ in tqdm.tqdm( - iterable=as_completed(daily_raw_s3_log_file_paths), + progress_bar_iterable = tqdm.tqdm( + iterable=as_completed(futures), desc=f"Parsing log files using {number_of_jobs} jobs...", total=len(daily_raw_s3_log_file_paths), position=0, leave=True, - ): - pass - - merged_temporary_folder_path = temporary_folder_path / "merged" - merged_temporary_folder_path.mkdir(exist_ok=True) + ) + for future in progress_bar_iterable: + future.result() # This is the call that finally triggers the deployment to the workers print("\n\nParallel parsing complete!\n\n") @@ -167,6 +156,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: leave=True, ): per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir()) + assert len(per_job_parsed_s3_log_file_paths) != 0, f"No files found in {per_job_temporary_folder_path}!" + for per_job_parsed_s3_log_file_path in tqdm.tqdm( iterable=per_job_parsed_s3_log_file_paths, desc="Merging results per job...", @@ -175,17 +166,24 @@ def asset_id_handler(*, raw_asset_id: str) -> str: leave=False, mininterval=1.0, ): - merged_temporary_file_path = merged_temporary_folder_path / per_job_parsed_s3_log_file_path.name + merged_temporary_file_path = temporary_output_folder_path / per_job_parsed_s3_log_file_path.name - parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path, index_col=0) - parsed_s3_log.to_csv(path_or_buf=merged_temporary_file_path, mode="a", sep="\t") + parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path) - order_parsed_logs( - unordered_parsed_s3_log_folder_path=merged_temporary_folder_path, - ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, - ) + header = False if merged_temporary_file_path.exists() else True + parsed_s3_log.to_csv( + path_or_buf=merged_temporary_file_path, mode="a", sep="\t", header=header, index=False + ) - shutil.rmtree(path=temporary_folder_path, ignore_errors=True) + # Always apply this step at the end to be sure we maintained chronological order + # (even if you think order of iteration itself was performed chronologically) + # This step also adds the index counter to the TSV + order_parsed_logs( + unordered_parsed_s3_log_folder_path=temporary_output_folder_path, + ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, + ) + + shutil.rmtree(path=temporary_output_folder_path, ignore_errors=True) return None @@ -195,26 +193,52 @@ def _multi_job_parse_dandi_raw_s3_log( number_of_jobs: int, raw_s3_log_file_path: pathlib.Path, temporary_folder_path: pathlib.Path, - mode: Literal["w", "a"], excluded_ips: collections.defaultdict[str, bool] | None, - exclude_github_ips: bool, - asset_id_handler: Callable | None, maximum_ram_usage_in_bytes: int, ) -> None: - """A mostly pass-through function to calculate the job index on the worker and target the correct subfolder.""" - job_index = os.getpid() % number_of_jobs - per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + """ + A mostly pass-through function to calculate the job index on the worker and target the correct subfolder. - parse_dandi_raw_s3_log( - raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=per_job_temporary_folder_path, - mode=mode, - excluded_ips=excluded_ips, - exclude_github_ips=exclude_github_ips, - asset_id_handler=asset_id_handler, - tqdm_kwargs=dict(position=job_index + 1, leave=False), - maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, - ) + Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe) + to a log file. + """ + + try: + error_message = "" + + def asset_id_handler(*, raw_asset_id: str) -> str: + """Apparently callables, even simple built-in ones, cannot be pickled.""" + split_by_slash = raw_asset_id.split("/") + return split_by_slash[0] + "_" + split_by_slash[-1] + + job_index = os.getpid() % number_of_jobs + per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}" + + # Define error catching stuff as part of the try clause + # so that if there is a problem within that, it too is caught + errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" + errors_folder_path.mkdir(exist_ok=True) + + dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") + date = datetime.datetime.now().strftime("%y%m%d") + parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt" + error_message += f"Job index {job_index}/{number_of_jobs} parsing {raw_s3_log_file_path} failed due to\n\n" + + parse_dandi_raw_s3_log( + raw_s3_log_file_path=raw_s3_log_file_path, + parsed_s3_log_folder_path=per_job_temporary_folder_path, + mode="a", + excluded_ips=excluded_ips, + exclude_github_ips=False, # Already included in list so avoid repeated construction + asset_id_handler=asset_id_handler, + tqdm_kwargs=dict(position=job_index + 1, leave=False), + maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + order_results=False, # Always disable this for parallel processing + ) + except Exception as exception: + with open(file=parallel_errors_file_path, mode="a") as io: + error_message += f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}\n\n" + io.write(error_message) return None @@ -229,6 +253,7 @@ def parse_dandi_raw_s3_log( asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, maximum_ram_usage_in_bytes: int = 4 * 10**9, + order_results: bool = True, ) -> None: """ Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. @@ -267,6 +292,10 @@ def asset_id_handler(*, raw_asset_id: str) -> str: Keyword arguments to pass to the tqdm progress bar. maximum_ram_usage_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + order_results : bool, default: True + Whether to order the results chronologically. + This is strongly suggested, but a common case of disabling it is if ordering is intended to be applied after + multiple steps of processing instead of during this operation. """ tqdm_kwargs = tqdm_kwargs or dict() @@ -286,7 +315,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: split_by_slash = raw_asset_id.split("/") return split_by_slash[0] + "_" + split_by_slash[-1] - return parse_raw_s3_log( + parse_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, parsed_s3_log_folder_path=parsed_s3_log_folder_path, mode=mode, @@ -296,8 +325,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: asset_id_handler=asset_id_handler, tqdm_kwargs=tqdm_kwargs, maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes, + order_results=order_results, ) + return None + def parse_raw_s3_log( *, @@ -310,6 +342,7 @@ def parse_raw_s3_log( asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, maximum_ram_usage_in_bytes: int = 4 * 10**9, + order_results: bool = True, ) -> None: """ Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. @@ -350,6 +383,10 @@ def asset_id_handler(*, raw_asset_id: str) -> str: Keyword arguments to pass to the tqdm progress bar. maximum_ram_usage_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to be used throughout the process. + order_results : bool, default: True + Whether to order the results chronologically. + This is strongly suggested, but a common case of disabling it is if ordering is intended to be applied after + multiple steps of processing instead of during this operation. """ raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) @@ -357,6 +394,21 @@ def asset_id_handler(*, raw_asset_id: str) -> str: excluded_ips = excluded_ips or collections.defaultdict(bool) tqdm_kwargs = tqdm_kwargs or dict() + if order_results is True: + # Create a fresh temporary directory in the home folder and then fresh subfolders for each job + temporary_base_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "temp" + temporary_base_folder_path.mkdir(exist_ok=True) + + # Clean up any previous tasks that failed to clean themselves up + for previous_task_folder_path in temporary_base_folder_path.iterdir(): + shutil.rmtree(path=previous_task_folder_path, ignore_errors=True) + + task_id = str(uuid.uuid4())[:5] + temporary_folder_path = temporary_base_folder_path / task_id + temporary_folder_path.mkdir(exist_ok=True) + temporary_output_folder_path = temporary_folder_path / "output" + temporary_output_folder_path.mkdir(exist_ok=True) + reduced_logs = _get_reduced_log_lines( raw_s3_log_file_path=raw_s3_log_file_path, bucket=bucket, @@ -387,18 +439,21 @@ def asset_id_handler(*, raw_asset_id: str) -> str: reduced_logs_binned_by_asset = reduced_logs_binned_by_unparsed_asset for raw_asset_id, reduced_logs_per_asset in reduced_logs_binned_by_asset.items(): - parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{raw_asset_id}.tsv" + output_folder_path = temporary_output_folder_path if order_results is True else parsed_s3_log_folder_path + parsed_s3_log_file_path = output_folder_path / f"{raw_asset_id}.tsv" data_frame = pandas.DataFrame(data=reduced_logs_per_asset) - data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t") - progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress" - progress_folder_path.mkdir(exist_ok=True) + header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True + data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False) + + if order_results is True: + order_parsed_logs( + unordered_parsed_s3_log_folder_path=temporary_output_folder_path, + ordered_parsed_s3_log_folder_path=parsed_s3_log_folder_path, + ) - date = datetime.datetime.now().strftime("%y%m%d") - progress_file_path = progress_folder_path / f"{date}.txt" - with open(file=progress_file_path, mode="a") as io: - io.write(f"Parsed {raw_s3_log_file_path} successfully!\n") + shutil.rmtree(path=temporary_output_folder_path, ignore_errors=True) return None diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index cc877b0..b752d13 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -18,7 +18,7 @@ import datetime import pathlib import re -from importlib.metadata import version as importlib_version +import importlib.metadata from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH from ._ip_utils import _get_region_from_ip_address @@ -104,12 +104,12 @@ def _get_full_log_line( errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" errors_folder_path.mkdir(exist_ok=True) - dandi_s3_log_parser_version = importlib_version(distribution_name="dandi_s3_log_parser") + dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") date = datetime.datetime.now().strftime("%y%m%d") lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_lines_errors.txt" with open(file=lines_errors_file_path, mode="a") as io: - io.write(f"Line {index} of {log_file_path} (parsed {number_of_parsed_items} items): {raw_line}") + io.write(f"Line {index} of {log_file_path} (parsed {number_of_parsed_items} items): {raw_line}\n\n") return full_log_line diff --git a/tests/examples/ordered_example_1/example_dandi_s3_log_0.log b/tests/examples/ordered_example_1/example_dandi_s3_log_0.log new file mode 100644 index 0000000..9e9f1ac --- /dev/null +++ b/tests/examples/ordered_example_1/example_dandi_s3_log_0.log @@ -0,0 +1,2 @@ +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [24/Apr/2021:12:03:05 +0000] 192.0.2.0 - NWC7V1KE70QZYJ5Q REST.GET.OBJECT blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629 "GET /blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629?versionId=yn5YAJiwT36Rv78jGYLM71GZumWL.QWn HTTP/1.1" 200 - 1443 1443 35 35 "-" "git-annex/8.20211028-g1c76278" yn5YAJiwT36Rv78jGYLM71GZumWL.QWn ojBg2QLVTSTWsCAe1HoC6IBNLUSPmWH276FdsedhZ/4CQ67DWuZQHcXXB9XUJxYKpnPHpJyBjMM= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [16/Mar/2022:02:21:12 +0000] 192.0.2.0 - J42N2W7ET0EC03CV REST.GET.OBJECT blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 "GET /blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 HTTP/1.1" 206 - 512 171408 53 52 "-" "-" - DX8oFoKQx0o5V3lwEuWBxF5p2fSXrwINj0rnxmas0YgjWuPqYLK/vnW60Txh23K93aahe0IFw2c= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - diff --git a/tests/examples/ordered_example_1/example_dandi_s3_log_1.log b/tests/examples/ordered_example_1/example_dandi_s3_log_1.log new file mode 100644 index 0000000..43bacc6 --- /dev/null +++ b/tests/examples/ordered_example_1/example_dandi_s3_log_1.log @@ -0,0 +1,2 @@ +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [31/Dec/2021:23:06:42 +0000] 192.0.2.0 - NWC7V1KE70QZYJ5Q REST.GET.OBJECT blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629 "GET /blobs/a7b/032/a7b032b8-1e31-429f-975f-52a28cec6629?versionId=yn5YAJiwT36Rv78jGYLM71GZumWL.QWn HTTP/1.1" 200 - 1443 1443 35 35 "-" "git-annex/8.20211028-g1c76278" yn5YAJiwT36Rv78jGYLM71GZumWL.QWn ojBg2QLVTSTWsCAe1HoC6IBNLUSPmWH276FdsedhZ/4CQ67DWuZQHcXXB9XUJxYKpnPHpJyBjMM= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - +8787a3c41bf7ce0d54359d9348ad5b08e16bd5bb8ae5aa4e1508b435773a066e dandiarchive [04/May/2022:05:06:35 +0000] 192.0.2.0 - J42N2W7ET0EC03CV REST.GET.OBJECT blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 "GET /blobs/11e/c89/11ec8933-1456-4942-922b-94e5878bb991 HTTP/1.1" 206 - 512 171408 53 52 "-" "-" - DX8oFoKQx0o5V3lwEuWBxF5p2fSXrwINj0rnxmas0YgjWuPqYLK/vnW60Txh23K93aahe0IFw2c= - ECDHE-RSA-AES128-GCM-SHA256 - dandiarchive.s3.amazonaws.com TLSv1.2 - diff --git a/tests/examples/ordered_example_1/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/ordered_example_1/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv new file mode 100644 index 0000000..bce3d14 --- /dev/null +++ b/tests/examples/ordered_example_1/expected_output/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv @@ -0,0 +1,3 @@ + timestamp bytes_sent region +0 2022-03-16 02:21:12 512 unknown +1 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/ordered_example_1/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/ordered_example_1/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..17a6317 --- /dev/null +++ b/tests/examples/ordered_example_1/expected_output/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,3 @@ + timestamp bytes_sent region +0 2021-04-24 12:03:05 1443 unknown +1 2021-12-31 23:06:42 1443 unknown diff --git a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv index 931a755..d7bbd96 100644 --- a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv +++ b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_11ec8933-1456-4942-922b-94e5878bb991.tsv @@ -1,5 +1,5 @@ - timestamp bytes_sent region -0 2022-07-01 05:06:35 512 unknown -1 2021-05-21 05:06:35 1234 US/California -2 2022-11-04 05:06:35 141424 US/Virginia -3 2020-02-24 05:06:35 124 CA/Ontario +timestamp bytes_sent region +2022-07-01 05:06:35 512 unknown +2021-05-21 05:06:35 1234 US/California +2022-11-04 05:06:35 141424 US/Virginia +2020-02-24 05:06:35 124 CA/Ontario diff --git a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv index 8e0ac58..d98dfc1 100644 --- a/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv +++ b/tests/examples/unordered_example_0/unordered_parsed_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -1,5 +1,5 @@ - timestamp bytes_sent region -0 2022-08-05 05:06:35 2141 CH/BeiJing -1 2022-03-21 05:06:35 24323 MX/Zacatecas -2 2019-11-13 05:06:35 4332423 US/Colorado -3 2020-04-16 05:06:35 12313153 unknown +timestamp bytes_sent region +2022-08-05 05:06:35 2141 CH/BeiJing +2022-03-21 05:06:35 24323 MX/Zacatecas +2019-11-13 05:06:35 4332423 US/Colorado +2020-04-16 05:06:35 12313153 unknown diff --git a/tests/test_dandi_s3_log_parser.py b/tests/test_dandi_s3_log_parser.py index 6bb8e32..646eee9 100644 --- a/tests/test_dandi_s3_log_parser.py +++ b/tests/test_dandi_s3_log_parser.py @@ -49,14 +49,14 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) -def parse_all_dandi_raw_s3_logs_example_0(tmpdir: py.path.local): +def test_parse_all_dandi_raw_s3_logs_example_0(tmpdir: py.path.local): tmpdir = pathlib.Path(tmpdir) file_parent = pathlib.Path(__file__).parent - examples_folder_path = file_parent / "examples" / "ordered_example_0" + examples_folder_path = file_parent / "examples" / "ordered_example_1" expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" - test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_1" dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( base_raw_s3_log_folder_path=examples_folder_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, @@ -86,14 +86,14 @@ def parse_all_dandi_raw_s3_logs_example_0(tmpdir: py.path.local): pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) -def parse_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local): +def test_parse_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local): tmpdir = pathlib.Path(tmpdir) file_parent = pathlib.Path(__file__).parent - examples_folder_path = file_parent / "examples" / "ordered_example_0" + examples_folder_path = file_parent / "examples" / "ordered_example_1" expected_parsed_s3_log_folder_path = examples_folder_path / "expected_output" - test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" + test_parsed_s3_log_folder_path = tmpdir / "parsed_example_1" dandi_s3_log_parser.parse_all_dandi_raw_s3_logs( base_raw_s3_log_folder_path=examples_folder_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path,