diff --git a/src/dandi_s3_log_parser/_config.py b/src/dandi_s3_log_parser/_config.py index f7a4eff..090ca76 100644 --- a/src/dandi_s3_log_parser/_config.py +++ b/src/dandi_s3_log_parser/_config.py @@ -27,7 +27,8 @@ def get_hash_salt(base_raw_s3_log_folder_path: str | pathlib.Path) -> str: - """Calculate the salt (in hexadecimal encoding) used for IP hashing. + """ + Calculate the salt (in hexadecimal encoding) used for IP hashing. Uses actual data from the first line of the first log file in the raw S3 log folder, which only we have access to. diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py index f83f43b..b22701e 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py @@ -35,7 +35,8 @@ def parse_all_dandi_raw_s3_logs( maximum_number_of_workers: int = Field(ge=1, le=os.cpu_count(), default=1), maximum_buffer_size_in_bytes: int = 4 * 10**9, ) -> None: - """Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. + """ + Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. Assumes the following folder structure... @@ -178,7 +179,11 @@ def asset_id_handler(*, raw_asset_id: str) -> str: header = False if merged_temporary_file_path.exists() else True parsed_s3_log.to_csv( - path_or_buf=merged_temporary_file_path, mode="a", sep="\t", header=header, index=False, + path_or_buf=merged_temporary_file_path, + mode="a", + sep="\t", + header=header, + index=False, ) print("\n\n") @@ -194,7 +199,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str: shutil.rmtree(path=temporary_base_folder_path, ignore_errors=True) - # Function cannot be covered because the line calls occur on subprocesses # pragma: no cover def _multi_job_parse_dandi_raw_s3_log( @@ -205,7 +209,8 @@ def _multi_job_parse_dandi_raw_s3_log( excluded_ips: collections.defaultdict[str, bool] | None, maximum_buffer_size_in_bytes: int, ) -> None: - """A mostly pass-through function to calculate the job index on the worker and target the correct subfolder. + """ + A mostly pass-through function to calculate the job index on the worker and target the correct subfolder. Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe) to a log file. @@ -250,7 +255,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str: io.write(error_message) - def parse_dandi_raw_s3_log( *, raw_s3_log_file_path: str | pathlib.Path, @@ -263,7 +267,8 @@ def parse_dandi_raw_s3_log( maximum_buffer_size_in_bytes: int = 4 * 10**9, order_results: bool = True, ) -> None: - """Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + """ + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. 'Parsing' here means: - limiting only to requests of the specified type (i.e., GET, PUT, etc.) @@ -338,4 +343,3 @@ def asset_id_handler(*, raw_asset_id: str) -> str: maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, order_results=order_results, ) - diff --git a/src/dandi_s3_log_parser/_ip_utils.py b/src/dandi_s3_log_parser/_ip_utils.py index 347c14d..eac3ace 100644 --- a/src/dandi_s3_log_parser/_ip_utils.py +++ b/src/dandi_s3_log_parser/_ip_utils.py @@ -62,7 +62,8 @@ def _load_ip_address_to_region_cache(ip_hash_to_region_file_path: FilePath | Non def _save_ip_address_to_region_cache( - ip_hash_to_region: dict[str, str], ip_hash_to_region_file_path: FilePath | None = None, + ip_hash_to_region: dict[str, str], + ip_hash_to_region_file_path: FilePath | None = None, ) -> None: """Save the IP address to region cache to disk.""" ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH @@ -72,7 +73,8 @@ def _save_ip_address_to_region_cache( def _get_region_from_ip_address(ip_hash_to_region: dict[str, str], ip_address: str) -> str | None: - """If the parsed S3 logs are meant to be shared openly, the remote IP could be used to directly identify individuals. + """ + If the parsed S3 logs are meant to be shared openly, the remote IP could be used to directly identify individuals. Instead, identify the generic region of the world the request came from and report that instead. """ diff --git a/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py b/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py index 4dc78dc..ec8db0f 100644 --- a/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py +++ b/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py @@ -5,7 +5,8 @@ def order_and_anonymize_parsed_logs( - unordered_parsed_s3_log_folder_path: pathlib.Path, ordered_and_anonymized_s3_log_folder_path: pathlib.Path, + unordered_parsed_s3_log_folder_path: pathlib.Path, + ordered_and_anonymized_s3_log_folder_path: pathlib.Path, ) -> None: """Order the contents of all parsed log files chronologically.""" ordered_and_anonymized_s3_log_folder_path.mkdir(exist_ok=True) @@ -29,5 +30,8 @@ def order_and_anonymize_parsed_logs( ordered_and_anonymized_s3_log_folder_path / unordered_parsed_s3_log_file_path.name ) ordered_and_anonymized_parsed_s3_log.to_csv( - path_or_buf=ordered_and_anonymized_parsed_s3_log_file_path, sep="\t", header=True, index=True, + path_or_buf=ordered_and_anonymized_parsed_s3_log_file_path, + sep="\t", + header=True, + index=True, ) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 4e7754f..22dda22 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -28,7 +28,8 @@ def parse_raw_s3_log( maximum_buffer_size_in_bytes: int = 4 * 10**9, order_results: bool = True, ) -> None: - """Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + """ + Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. 'Parsing' here means: - limiting only to requests of the specified type (i.e., GET, PUT, etc.) @@ -104,7 +105,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: for reduced_log in reduced_logs: raw_asset_id = reduced_log.asset_id reduced_logs_binned_by_unparsed_asset[raw_asset_id] = reduced_logs_binned_by_unparsed_asset.get( - raw_asset_id, collections.defaultdict(list), + raw_asset_id, + collections.defaultdict(list), ) reduced_logs_binned_by_unparsed_asset[raw_asset_id]["timestamp"].append(reduced_log.timestamp) @@ -138,7 +140,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str: shutil.rmtree(path=temporary_output_folder_path, ignore_errors=True) - def _get_reduced_log_lines( *, raw_s3_log_file_path: pathlib.Path, @@ -149,7 +150,8 @@ def _get_reduced_log_lines( maximum_buffer_size_in_bytes: int = 4 * 10**9, ip_hash_to_region_file_path: pathlib.Path | None, ) -> list[_ReducedLogLine]: - """Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. + """ + Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects. Parameters ---------- @@ -181,10 +183,13 @@ def _get_reduced_log_lines( reduced_log_lines = list() per_buffer_index = 0 buffered_text_reader = BufferedTextReader( - file_path=raw_s3_log_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + file_path=raw_s3_log_file_path, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) for buffered_raw_lines in tqdm.tqdm( - iterable=buffered_text_reader, total=len(buffered_text_reader), **resolved_tqdm_kwargs, + iterable=buffered_text_reader, + total=len(buffered_text_reader), + **resolved_tqdm_kwargs, ): index = 0 for raw_line in buffered_raw_lines: diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index 578c8ae..4809a51 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -1,4 +1,5 @@ -"""Primary functions for parsing a single line of a raw S3 log. +""" +Primary functions for parsing a single line of a raw S3 log. The strategy is to... @@ -72,7 +73,8 @@ def _find_all_possible_substring_indices(*, string: str, substring: str) -> list def _attempt_to_remove_quotes(*, raw_line: str, bad_parsed_line: str) -> str: - """Attempt to remove bad quotes from a raw line of an S3 log file. + """ + Attempt to remove bad quotes from a raw line of an S3 log file. These quotes are not properly escaped and are causing issues with the regex pattern. Various attempts to fix the regex failed, so this is the most reliable correction I could find. @@ -96,7 +98,8 @@ def _attempt_to_remove_quotes(*, raw_line: str, bad_parsed_line: str) -> str: def _parse_s3_log_line(*, raw_line: str) -> list[str]: - """The current method of parsing lines of an S3 log file. + """ + The current method of parsing lines of an S3 log file. Bad lines reported in https://github.com/catalystneuro/dandi_s3_log_parser/issues/18 led to quote scrubbing as a pre-step. No self-contained single regex was found that could account for this uncorrected strings. @@ -165,7 +168,8 @@ def _append_reduced_log_line( index: int, ip_hash_to_region: dict[str, str], ) -> None: - """Append the `reduced_log_lines` list with a ReducedLogLine constructed from a single raw log line, if it is valid. + """ + Append the `reduced_log_lines` list with a ReducedLogLine constructed from a single raw log line, if it is valid. Parameters ---------- diff --git a/src/dandi_s3_log_parser/testing/_helpers.py b/src/dandi_s3_log_parser/testing/_helpers.py index 70bcb32..dfe614f 100644 --- a/src/dandi_s3_log_parser/testing/_helpers.py +++ b/src/dandi_s3_log_parser/testing/_helpers.py @@ -14,7 +14,8 @@ def find_random_example_line( maximum_lines_per_request_type: int = 5, seed: int = 0, ) -> str: - """Return a randomly chosen line from a folder of raw S3 log files to serve as an example for testing purposes. + """ + Return a randomly chosen line from a folder of raw S3 log files to serve as an example for testing purposes. Parameters ---------- diff --git a/tests/test_buffered_text_reader.py b/tests/test_buffered_text_reader.py index 98bc6fd..dadb7fa 100644 --- a/tests/test_buffered_text_reader.py +++ b/tests/test_buffered_text_reader.py @@ -37,7 +37,8 @@ def single_line_text_file_path(tmp_path_factory: pytest.TempPathFactory): def test_buffered_text_reader(large_text_file_path: pathlib.Path): maximum_buffer_size_in_bytes = 10**6 # 1 MB buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=large_text_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + file_path=large_text_file_path, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) assert iter(buffered_text_reader) is buffered_text_reader, "BufferedTextReader object is not iterable!" @@ -58,7 +59,8 @@ def test_value_error(single_line_text_file_path: pathlib.Path): maximum_buffer_size_in_bytes = 10**6 # 1 MB with pytest.raises(ValueError) as error_info: buffered_text_reader = dandi_s3_log_parser.BufferedTextReader( - file_path=single_line_text_file_path, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + file_path=single_line_text_file_path, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) next(buffered_text_reader) diff --git a/tests/test_order_and_anonymize.py b/tests/test_order_and_anonymize.py index 4efa422..ab1d6ef 100644 --- a/tests/test_order_and_anonymize.py +++ b/tests/test_order_and_anonymize.py @@ -25,11 +25,14 @@ def test_order_and_anonymize(tmpdir: py.path.local) -> None: expected_ordered_and_anonymized_s3_log_file_path = expected_output_folder_path / parsed_log_file_name test_ordered_and_anonymized_s3_log = pandas.read_table( - filepath_or_buffer=test_ordered_and_anonymized_s3_log_file_path, index_col=0, + filepath_or_buffer=test_ordered_and_anonymized_s3_log_file_path, + index_col=0, ) expected_ordered_and_anonymized_s3_log = pandas.read_table( - filepath_or_buffer=expected_ordered_and_anonymized_s3_log_file_path, index_col=0, + filepath_or_buffer=expected_ordered_and_anonymized_s3_log_file_path, + index_col=0, ) pandas.testing.assert_frame_equal( - left=test_ordered_and_anonymized_s3_log, right=expected_ordered_and_anonymized_s3_log, + left=test_ordered_and_anonymized_s3_log, + right=expected_ordered_and_anonymized_s3_log, ) diff --git a/tests/test_parse_dandi_raw_s3_log.py b/tests/test_parse_dandi_raw_s3_log.py index 21b3b71..3bb4107 100644 --- a/tests/test_parse_dandi_raw_s3_log.py +++ b/tests/test_parse_dandi_raw_s3_log.py @@ -7,7 +7,8 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): - """Most basic test of functionality. + """ + Most basic test of functionality. If there are failures in the parsing of any lines found in application, please raise an issue and contribute them to the example log collection. @@ -21,7 +22,8 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local): test_parsed_s3_log_folder_path = tmpdir / "parsed_example_0" dandi_s3_log_parser.parse_dandi_raw_s3_log( - raw_s3_log_file_path=example_raw_s3_log_file_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + raw_s3_log_file_path=example_raw_s3_log_file_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, ) test_output_file_paths = [path for path in test_parsed_s3_log_folder_path.iterdir() if path.is_file()] diff --git a/tests/test_parse_dandi_raw_s3_log_bad_lines.py b/tests/test_parse_dandi_raw_s3_log_bad_lines.py index 1868f8b..c982caa 100644 --- a/tests/test_parse_dandi_raw_s3_log_bad_lines.py +++ b/tests/test_parse_dandi_raw_s3_log_bad_lines.py @@ -7,8 +7,7 @@ def test_parse_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local): - """'ordered_example_2' contains the basic test cases as well as a collection of 'bad lines' contributed over time. - """ + """'ordered_example_2' contains the basic test cases as well as a collection of 'bad lines' contributed over time.""" tmpdir = pathlib.Path(tmpdir) # Count initial error folder contents @@ -23,7 +22,8 @@ def test_parse_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local): test_parsed_s3_log_folder_path = tmpdir / "parsed_example_2" dandi_s3_log_parser.parse_dandi_raw_s3_log( - raw_s3_log_file_path=example_raw_s3_log_file_path, parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, + raw_s3_log_file_path=example_raw_s3_log_file_path, + parsed_s3_log_folder_path=test_parsed_s3_log_folder_path, ) test_output_file_paths = [path for path in test_parsed_s3_log_folder_path.iterdir() if path.is_file()]