diff --git a/README.md b/README.md index dfed567..eccec89 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ For example, on Drogon: parse_all_dandi_raw_s3_logs \ --base_raw_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs \ --parsed_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \ - --excluded_log_files /mnt/backup/dandi/dandiarchive-logs/stats/start-end.log \ + --excluded_log_files /mnt/backup/dandi/dandiarchive-logs/stats/start-end.log,/mnt/backup/dandi/dandiarchive-logs/2024/05/.git/gc.log \ --excluded_ips < Drogons IP > \ --maximum_number_of_workers 6 \ --maximum_buffer_size_in_mb 5000 diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index 28c3ba3..0fe2b89 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -3,18 +3,16 @@ from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH from ._s3_log_file_parser import parse_raw_s3_log from ._buffered_text_reader import BufferedTextReader -from ._order_and_anonymize_parsed_logs import order_and_anonymize_parsed_logs from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs -from ._ip_utils import get_hash_salt -from ._log_utils import find_all_known_operation_types +from ._ip_utils import get_region_from_ip_address +from ._dandiset_mapper import map_reduced_logs_to_all_dandisets __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", - "BufferedTextReader", - "get_hash_salt", "parse_raw_s3_log", + "BufferedTextReader", "parse_dandi_raw_s3_log", "parse_all_dandi_raw_s3_logs", - "order_and_anonymize_parsed_logs", - "find_all_known_operation_types", + "get_region_from_ip_address", + "map_reduced_logs_to_all_dandisets", ] diff --git a/src/dandi_s3_log_parser/_dandiset_mapper.py b/src/dandi_s3_log_parser/_dandiset_mapper.py new file mode 100644 index 0000000..69f3d4e --- /dev/null +++ b/src/dandi_s3_log_parser/_dandiset_mapper.py @@ -0,0 +1,96 @@ +import os +import pathlib + +import dandi.dandiapi +import pandas +import tqdm +from pydantic import DirectoryPath, validate_call + +from ._ip_utils import _load_ip_hash_to_region_cache, get_region_from_ip_address + + +@validate_call +def map_reduced_logs_to_all_dandisets( + reduced_s3_log_folder_path: DirectoryPath, all_dandiset_logs_folder_path: DirectoryPath +) -> None: + if "IPINFO_CREDENTIALS" not in os.environ: + message = "The environment variable 'IPINFO_CREDENTIALS' must be set to import `dandi_s3_log_parser`!" + raise ValueError(message) # pragma: no cover + + if "IP_HASH_SALT" not in os.environ: + message = ( + "The environment variable 'IP_HASH_SALT' must be set to import `dandi_s3_log_parser`! " + "To retrieve the value, set a temporary value to this environment variable " + "and then use the `get_hash_salt` helper function and set it to the correct value." + ) + raise ValueError(message) # pragma: no cover + + client = dandi.dandiapi.DandiAPIClient() + + ip_hash_to_region = _load_ip_hash_to_region_cache() + current_dandisets = list(client.get_dandisets()) + for dandiset in tqdm.tqdm( + iterable=current_dandisets, + total=len(current_dandisets), + desc="Mapping reduced logs to Dandisets...", + position=0, + mininterval=5.0, + smoothing=0, + ): + _map_reduced_logs_to_dandiset( + dandiset=dandiset, + reduced_s3_log_folder_path=reduced_s3_log_folder_path, + all_dandiset_logs_folder_path=all_dandiset_logs_folder_path, + client=client, + ip_hash_to_region=ip_hash_to_region, + ) + + +def _map_reduced_logs_to_dandiset( + dandiset: dandi.dandiapi.RemoteDandiset, + reduced_s3_log_folder_path: pathlib.Path, + all_dandiset_logs_folder_path: pathlib.Path, + client: dandi.dandiapi.DandiAPIClient, + ip_hash_to_region: dict[str, str], +) -> None: + dandiset_id = dandiset.identifier + + dandiset_log_folder_path = all_dandiset_logs_folder_path / dandiset_id + + for version in dandiset.get_versions(): + version_id = version.identifier + + dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id) + + all_reduced_logs = [] + for asset in dandiset_version.get_assets(): + asset_id = asset.identifier + asset_suffixes = pathlib.Path(asset.path).suffixes + + blob_or_zarr = "blobs" if ".zarr" not in asset_suffixes else "zarr" + + reduced_log_file_path = reduced_s3_log_folder_path / f"{blob_or_zarr}_{asset_id}.tsv" + + if not reduced_log_file_path.exists(): + continue # No reduced logs found (possible asset was never accessed); skip to next asset + + reduced_log = pandas.read_table(filepath_or_buffer=reduced_log_file_path, header=0) + reduced_log["asset_id"] = [asset_id] * len(reduced_log) + reduced_log["region"] = [ + get_region_from_ip_address(ip_address=ip_address, ip_hash_to_region=ip_hash_to_region) + for ip_address in reduced_log["ip_address"] + ] + + reordered_reduced_log = reduced_log.reindex(columns=("asset_id", "timestamp", "bytes_sent", "region")) + all_reduced_logs.append(reordered_reduced_log) + + if len(all_reduced_logs) == 0: + continue # No reduced logs found (possible dandiset version was never accessed); skip to next version + + mapped_log = pandas.concat(objs=all_reduced_logs, ignore_index=True) + mapped_log.sort_values(by="timestamp") + mapped_log.index = range(len(mapped_log)) + + dandiset_log_folder_path.mkdir(exist_ok=True) + version_file_path = dandiset_log_folder_path / f"{version_id}.tsv" + mapped_log.to_csv(version_file_path, mode="w", sep="\t", header=True, index=True) diff --git a/src/dandi_s3_log_parser/_ip_utils.py b/src/dandi_s3_log_parser/_ip_utils.py index 3aeaeb8..6ba8700 100644 --- a/src/dandi_s3_log_parser/_ip_utils.py +++ b/src/dandi_s3_log_parser/_ip_utils.py @@ -5,7 +5,6 @@ import importlib.metadata import ipaddress import os -import pathlib import traceback import ipinfo @@ -19,85 +18,7 @@ ) -def get_hash_salt(base_raw_s3_log_folder_path: FilePath) -> str: - """ - Calculate the salt (in hexadecimal encoding) used for IP hashing. - - Uses actual data from the first line of the first log file in the raw S3 log folder, which only we have access to. - - Otherwise, it would be fairly easy to iterate over every possible IP address and find the SHA1 of it. - """ - base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) - - # Retrieve the first line of the first log file (which only we know) and use that as a secure salt - first_log_file_path = base_raw_s3_log_folder_path / "2019" / "10" / "01.log" - - with open(file=first_log_file_path) as io: - first_line = io.readline() - - hash_salt = hashlib.sha1(string=bytes(first_line, "utf-8")) - - return hash_salt.hexdigest() - - -def _cidr_address_to_ip_range(*, cidr_address: str) -> list[str]: - """Convert a CIDR address to a list of IP addresses.""" - cidr_address_class = type(ipaddress.ip_address(cidr_address.split("/")[0])) - ip_address_range = [] - if cidr_address_class is ipaddress.IPv4Address: - ip_address_range = ipaddress.IPv4Network(address=cidr_address) - elif cidr_address_class is ipaddress.IPv6Address: # pragma: no cover - ip_address_range = ipaddress.IPv6Network(address=cidr_address) - - return [str(ip_address) for ip_address in ip_address_range] - - -def _get_latest_github_ip_ranges() -> list[str]: - """Retrieve the latest GitHub CIDR ranges from their API and expand them into a list of IP addresses.""" - github_ip_request = requests.get("https://api.github.com/meta").json() - - skip_keys = ["domains", "ssh_key_fingerprints", "verifiable_password_authentication", "ssh_keys"] - keys = set(github_ip_request.keys()) - set(skip_keys) - github_cidr_addresses = [ - cidr_address for key in keys for cidr_address in github_ip_request[key] if "::" not in cidr_address # Skip IPv6 - ] - - all_github_ips = [ - str(ip_address) - for cidr_address in github_cidr_addresses - for ip_address in _cidr_address_to_ip_range(cidr_address=cidr_address) - ] - - return all_github_ips - - -def _load_ip_hash_to_region_cache() -> dict[str, str]: - """Load the IP hash to region cache from disk.""" - if not _IP_HASH_TO_REGION_FILE_PATH.exists(): - return {} # pragma: no cover - - with open(file=_IP_HASH_TO_REGION_FILE_PATH) as stream: - return yaml.load(stream=stream, Loader=yaml.SafeLoader) - - -def _save_ip_hash_to_region_cache(*, ip_hash_to_region: dict[str, str]) -> None: - """Save the IP hash to region cache to disk.""" - with open(file=_IP_HASH_TO_REGION_FILE_PATH, mode="w") as stream: - yaml.dump(data=ip_hash_to_region, stream=stream) - - -def _save_ip_address_to_region_cache( - ip_hash_to_region: dict[str, str], - ip_hash_to_region_file_path: FilePath | None = None, -) -> None: - """Save the IP address to region cache to disk.""" - ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH - - with open(file=ip_hash_to_region_file_path, mode="w") as stream: - yaml.dump(data=ip_hash_to_region, stream=stream) - - -def _get_region_from_ip_address(ip_address: str, ip_hash_to_region: dict[str, str]) -> str | None: +def get_region_from_ip_address(ip_address: str, ip_hash_to_region: dict[str, str]) -> str | None: """ If the parsed S3 logs are meant to be shared openly, the remote IP could be used to directly identify individuals. @@ -168,3 +89,60 @@ def _get_region_from_ip_address(ip_address: str, ip_hash_to_region: dict[str, st ) return "unknown" + + +def _cidr_address_to_ip_range(*, cidr_address: str) -> list[str]: + """Convert a CIDR address to a list of IP addresses.""" + cidr_address_class = type(ipaddress.ip_address(cidr_address.split("/")[0])) + ip_address_range = [] + if cidr_address_class is ipaddress.IPv4Address: + ip_address_range = ipaddress.IPv4Network(address=cidr_address) + elif cidr_address_class is ipaddress.IPv6Address: # pragma: no cover + ip_address_range = ipaddress.IPv6Network(address=cidr_address) + + return [str(ip_address) for ip_address in ip_address_range] + + +def _get_latest_github_ip_ranges() -> list[str]: + """Retrieve the latest GitHub CIDR ranges from their API and expand them into a list of IP addresses.""" + github_ip_request = requests.get("https://api.github.com/meta").json() + + skip_keys = ["domains", "ssh_key_fingerprints", "verifiable_password_authentication", "ssh_keys"] + keys = set(github_ip_request.keys()) - set(skip_keys) + github_cidr_addresses = [ + cidr_address for key in keys for cidr_address in github_ip_request[key] if "::" not in cidr_address # Skip IPv6 + ] + + all_github_ips = [ + str(ip_address) + for cidr_address in github_cidr_addresses + for ip_address in _cidr_address_to_ip_range(cidr_address=cidr_address) + ] + + return all_github_ips + + +def _load_ip_hash_to_region_cache() -> dict[str, str]: + """Load the IP hash to region cache from disk.""" + if not _IP_HASH_TO_REGION_FILE_PATH.exists(): + return {} # pragma: no cover + + with open(file=_IP_HASH_TO_REGION_FILE_PATH) as stream: + return yaml.load(stream=stream, Loader=yaml.SafeLoader) + + +def _save_ip_hash_to_region_cache(*, ip_hash_to_region: dict[str, str]) -> None: + """Save the IP hash to region cache to disk.""" + with open(file=_IP_HASH_TO_REGION_FILE_PATH, mode="w") as stream: + yaml.dump(data=ip_hash_to_region, stream=stream) + + +def _save_ip_address_to_region_cache( + ip_hash_to_region: dict[str, str], + ip_hash_to_region_file_path: FilePath | None = None, +) -> None: + """Save the IP address to region cache to disk.""" + ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH + + with open(file=ip_hash_to_region_file_path, mode="w") as stream: + yaml.dump(data=ip_hash_to_region, stream=stream) diff --git a/src/dandi_s3_log_parser/_log_utils.py b/src/dandi_s3_log_parser/_log_utils.py deleted file mode 100644 index 2b5a1aa..0000000 --- a/src/dandi_s3_log_parser/_log_utils.py +++ /dev/null @@ -1,39 +0,0 @@ -import pathlib -import random - -import tqdm -from pydantic import DirectoryPath, FilePath, validate_call - -from ._buffered_text_reader import BufferedTextReader - - -@validate_call -def find_all_known_operation_types( - base_raw_s3_log_folder_path: DirectoryPath, - excluded_log_files: list[FilePath] | None, - max_files: int | None = None, -) -> set: - base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) - excluded_log_files = excluded_log_files or {} - excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files} - - daily_raw_s3_log_file_paths = list(set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files) - random.shuffle(daily_raw_s3_log_file_paths) - - unique_operation_types = set() - for raw_s3_log_file_path in tqdm.tqdm( - iterable=daily_raw_s3_log_file_paths[:max_files], - desc="Extracting operation types from log files...", - position=0, - leave=True, - smoothing=0, - ): - operation_types_per_file = { - raw_log_line.split(" ")[7] - for buffered_text_reader in BufferedTextReader(file_path=raw_s3_log_file_path) - for raw_log_line in buffered_text_reader - } - - unique_operation_types.update(operation_types_per_file) - - return unique_operation_types diff --git a/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py b/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py deleted file mode 100644 index 9ad7abc..0000000 --- a/src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py +++ /dev/null @@ -1,48 +0,0 @@ -import pathlib - -import pandas -import tqdm - -from ._ip_utils import _get_region_from_ip_address, _load_ip_hash_to_region_cache - - -def order_and_anonymize_parsed_logs( - *, - unordered_parsed_s3_log_folder_path: pathlib.Path, - ordered_and_anonymized_s3_log_folder_path: pathlib.Path, -) -> None: - """Order the contents of all parsed log files chronologically.""" - ordered_and_anonymized_s3_log_folder_path.mkdir(exist_ok=True) - - unordered_file_paths = list(unordered_parsed_s3_log_folder_path.glob("*.tsv")) - for unordered_parsed_s3_log_file_path in tqdm.tqdm( - iterable=unordered_file_paths, - total=len(unordered_file_paths), - desc="Ordering parsed logs...", - position=0, - leave=True, - mininterval=3.0, - ): - unordered_parsed_s3_log = pandas.read_table(filepath_or_buffer=unordered_parsed_s3_log_file_path, header=0) - ordered_and_anonymized_parsed_s3_log = unordered_parsed_s3_log.sort_values(by="timestamp") - - # Correct index of first column - ordered_and_anonymized_parsed_s3_log.index = range(len(unordered_parsed_s3_log)) - - # Map IP addresses to region - ip_hash_to_region = _load_ip_hash_to_region_cache() - ordered_and_anonymized_parsed_s3_log["region"] = [ - _get_region_from_ip_address(ip_address=ip_address, ip_hash_to_region=ip_hash_to_region) - for ip_address in ordered_and_anonymized_parsed_s3_log["ip_address"] - ] - del ordered_and_anonymized_parsed_s3_log["ip_address"] - - ordered_and_anonymized_parsed_s3_log_file_path = ( - ordered_and_anonymized_s3_log_folder_path / unordered_parsed_s3_log_file_path.name - ) - ordered_and_anonymized_parsed_s3_log.to_csv( - path_or_buf=ordered_and_anonymized_parsed_s3_log_file_path, - sep="\t", - header=True, - index=True, - ) diff --git a/src/dandi_s3_log_parser/testing/_helpers.py b/src/dandi_s3_log_parser/testing/_helpers.py index dfe614f..48cd230 100644 --- a/src/dandi_s3_log_parser/testing/_helpers.py +++ b/src/dandi_s3_log_parser/testing/_helpers.py @@ -1,10 +1,15 @@ """Collection of helper functions related to testing and generating of example lines.""" import collections +import hashlib import pathlib import random from typing import Literal +import tqdm +from pydantic import DirectoryPath, FilePath, validate_call + +from .._buffered_text_reader import BufferedTextReader from .._config import REQUEST_TYPES @@ -103,3 +108,56 @@ def find_random_example_line( anonymized_random_line = " ".join(random_line_items) return anonymized_random_line + + +def get_hash_salt(base_raw_s3_log_folder_path: FilePath) -> str: + """ + Calculate the salt (in hexadecimal encoding) used for IP hashing. + + Uses actual data from the first line of the first log file in the raw S3 log folder, which only we have access to. + + Otherwise, it would be fairly easy to iterate over every possible IP address and find the SHA1 of it. + """ + base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) + + # Retrieve the first line of the first log file (which only we know) and use that as a secure salt + first_log_file_path = base_raw_s3_log_folder_path / "2019" / "10" / "01.log" + + with open(file=first_log_file_path) as io: + first_line = io.readline() + + hash_salt = hashlib.sha1(string=bytes(first_line, "utf-8")) + + return hash_salt.hexdigest() + + +@validate_call +def find_all_known_operation_types( + base_raw_s3_log_folder_path: DirectoryPath, + excluded_log_files: list[FilePath] | None, + max_files: int | None = None, +) -> set: + base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) + excluded_log_files = excluded_log_files or {} + excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files} + + daily_raw_s3_log_file_paths = list(set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files) + random.shuffle(daily_raw_s3_log_file_paths) + + unique_operation_types = set() + for raw_s3_log_file_path in tqdm.tqdm( + iterable=daily_raw_s3_log_file_paths[:max_files], + desc="Extracting operation types from log files...", + position=0, + leave=True, + smoothing=0, + ): + operation_types_per_file = { + raw_log_line.split(" ")[7] + for buffered_text_reader in BufferedTextReader(file_path=raw_s3_log_file_path) + for raw_log_line in buffered_text_reader + } + + unique_operation_types.update(operation_types_per_file) + + return unique_operation_types diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv new file mode 100644 index 0000000..c045fef --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv @@ -0,0 +1,3 @@ + asset_id timestamp bytes_sent region +0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown +1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv new file mode 100644 index 0000000..c045fef --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv @@ -0,0 +1,3 @@ + asset_id timestamp bytes_sent region +0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown +1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv new file mode 100644 index 0000000..c045fef --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv @@ -0,0 +1,3 @@ + asset_id timestamp bytes_sent region +0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown +1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv new file mode 100644 index 0000000..041db08 --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv @@ -0,0 +1,3 @@ + asset_id timestamp bytes_sent region +0 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-04-24 12:03:05 1443 unknown +1 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-12-31 23:06:42 1443 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv new file mode 100644 index 0000000..041db08 --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv @@ -0,0 +1,3 @@ + asset_id timestamp bytes_sent region +0 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-04-24 12:03:05 1443 unknown +1 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-12-31 23:06:42 1443 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_5e9e92e1-f044-4aa0-ab47-1cfcb8899348.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_5e9e92e1-f044-4aa0-ab47-1cfcb8899348.tsv new file mode 100644 index 0000000..570c480 --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_5e9e92e1-f044-4aa0-ab47-1cfcb8899348.tsv @@ -0,0 +1,3 @@ +timestamp bytes_sent ip_address line_index +2022-03-16 02:21:12 512 192.0.2.0 1 +2022-05-04 05:06:35 512 192.0.2.0 1 diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv new file mode 100644 index 0000000..c045fef --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv @@ -0,0 +1,3 @@ + asset_id timestamp bytes_sent region +0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown +1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cbcf1d6d-7f64-4d1f-8692-75e09e177ca6.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cbcf1d6d-7f64-4d1f-8692-75e09e177ca6.tsv new file mode 100644 index 0000000..0851ae8 --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cbcf1d6d-7f64-4d1f-8692-75e09e177ca6.tsv @@ -0,0 +1,3 @@ +timestamp bytes_sent ip_address line_index +2021-04-24 12:03:05 1443 192.0.2.0 0 +2021-12-31 23:06:42 1443 192.0.2.0 0 diff --git a/tests/test_map_reduced_logs_to_all_dandisets.py b/tests/test_map_reduced_logs_to_all_dandisets.py new file mode 100644 index 0000000..8a2a10c --- /dev/null +++ b/tests/test_map_reduced_logs_to_all_dandisets.py @@ -0,0 +1,55 @@ +import pathlib + +import pandas +import py + +import dandi_s3_log_parser + + +def test_map_reduced_logs_to_all_dandisets(tmpdir: py.path.local): + tmpdir = pathlib.Path(tmpdir) + + file_parent = pathlib.Path(__file__).parent + examples_folder_path = file_parent / "examples" / "mapped_to_dandiset_example_0" + reduced_s3_log_folder_path = examples_folder_path / "reduced_logs" + all_dandiset_logs_folder_path = tmpdir + + dandi_s3_log_parser.map_reduced_logs_to_all_dandisets( + reduced_s3_log_folder_path=reduced_s3_log_folder_path, + all_dandiset_logs_folder_path=all_dandiset_logs_folder_path, + ) + + expected_output_folder_path = examples_folder_path / "expected_output" + + # Ensure to extra folders were created + test_dandiset_id_folder_paths = [ + dandiset_id_folder_path.stem for dandiset_id_folder_path in all_dandiset_logs_folder_path.iterdir() + ] + expected_dandiset_id_folder_paths = [ + dandiset_id_folder_path.stem for dandiset_id_folder_path in expected_output_folder_path.iterdir() + ] + assert set(test_dandiset_id_folder_paths) == set(expected_dandiset_id_folder_paths) + + test_dandiset_version_id_file_paths = { + f"{version_id_file_path.parent.name}/{version_id_file_path.name}": version_id_file_path + for dandiset_id_folder_path in all_dandiset_logs_folder_path.iterdir() + for version_id_file_path in dandiset_id_folder_path.iterdir() + } + expected_dandiset_version_id_file_paths = { + f"{version_id_file_path.parent.name}/{version_id_file_path.name}": version_id_file_path + for dandiset_id_folder_path in expected_output_folder_path.iterdir() + for version_id_file_path in dandiset_id_folder_path.iterdir() + } + assert set(test_dandiset_version_id_file_paths.keys()) == set(expected_dandiset_version_id_file_paths.keys()) + + for expected_version_id_file_path in expected_dandiset_version_id_file_paths.values(): + test_version_id_file_path = ( + all_dandiset_logs_folder_path + / expected_version_id_file_path.parent.name + / expected_version_id_file_path.name + ) + + test_mapped_log = pandas.read_table(filepath_or_buffer=test_version_id_file_path, index_col=0) + expected_mapped_log = pandas.read_table(filepath_or_buffer=expected_version_id_file_path, index_col=0) + + pandas.testing.assert_frame_equal(left=test_mapped_log, right=expected_mapped_log) diff --git a/tests/test_order_and_anonymize.py b/tests/test_order_and_anonymize.py deleted file mode 100644 index 45bd0df..0000000 --- a/tests/test_order_and_anonymize.py +++ /dev/null @@ -1,43 +0,0 @@ -import pathlib - -import pandas -import py - -import dandi_s3_log_parser - - -def test_order_and_anonymize(tmpdir: py.path.local) -> None: - """Basic test for ordering and anonymizing parsed S3 logs.""" - tmpdir = pathlib.Path(tmpdir) - - unordered_example_base_folder_path = pathlib.Path(__file__).parent / "examples" / "order_and_anonymize_example_0" - unordered_parsed_s3_log_folder_path = unordered_example_base_folder_path / "unordered_parsed_logs" - ordered_and_anonymized_s3_log_folder_path = tmpdir - - dandi_s3_log_parser.order_and_anonymize_parsed_logs( - unordered_parsed_s3_log_folder_path=unordered_parsed_s3_log_folder_path, - ordered_and_anonymized_s3_log_folder_path=ordered_and_anonymized_s3_log_folder_path, - ) - - parsed_log_file_stems = [path.name for path in unordered_parsed_s3_log_folder_path.iterdir()] - expected_output_folder_path = unordered_example_base_folder_path / "expected_output" - for parsed_log_file_name in parsed_log_file_stems: - test_ordered_and_anonymized_s3_log_file_path = ordered_and_anonymized_s3_log_folder_path / parsed_log_file_name - expected_ordered_and_anonymized_s3_log_file_path = expected_output_folder_path / parsed_log_file_name - - test_ordered_and_anonymized_s3_log = pandas.read_table( - filepath_or_buffer=test_ordered_and_anonymized_s3_log_file_path, - index_col=0, - ) - expected_ordered_and_anonymized_s3_log = pandas.read_table( - filepath_or_buffer=expected_ordered_and_anonymized_s3_log_file_path, - index_col=0, - ) - - test_ordered_and_anonymized_s3_log = test_ordered_and_anonymized_s3_log.sort_values(by="timestamp") - expected_ordered_and_anonymized_s3_log = expected_ordered_and_anonymized_s3_log.sort_values(by="timestamp") - - pandas.testing.assert_frame_equal( - left=test_ordered_and_anonymized_s3_log, - right=expected_ordered_and_anonymized_s3_log, - ) diff --git a/tests/test_parse_all_dandi_raw_s3_logs.py b/tests/test_parse_all_dandi_raw_s3_logs.py index 5dc7301..924cc5c 100644 --- a/tests/test_parse_all_dandi_raw_s3_logs.py +++ b/tests/test_parse_all_dandi_raw_s3_logs.py @@ -38,11 +38,11 @@ def test_parse_all_dandi_raw_s3_logs_example_1(tmpdir: py.path.local) -> None: test_parsed_s3_log_file_path.stem in expected_asset_ids ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" - test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) expected_parsed_s3_log_file_path = ( expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" ) - expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) test_parsed_s3_log = test_parsed_s3_log.sort_values(by="timestamp") expected_parsed_s3_log = expected_parsed_s3_log.sort_values(by="timestamp") diff --git a/tests/test_parse_all_dandi_raw_s3_logs_parallel.py b/tests/test_parse_all_dandi_raw_s3_logs_parallel.py index f1a828e..ee93a30 100644 --- a/tests/test_parse_all_dandi_raw_s3_logs_parallel.py +++ b/tests/test_parse_all_dandi_raw_s3_logs_parallel.py @@ -39,15 +39,18 @@ def test_parse_all_dandi_raw_s3_logs_example_0_parallel(tmpdir: py.path.local) - test_parsed_s3_log_file_path.stem in expected_asset_ids ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" - test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) expected_parsed_s3_log_file_path = ( expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" ) - expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) test_parsed_s3_log = test_parsed_s3_log.sort_values(by="timestamp") expected_parsed_s3_log = expected_parsed_s3_log.sort_values(by="timestamp") + test_parsed_s3_log.index = range(len(test_parsed_s3_log)) + expected_parsed_s3_log.index = range(len(expected_parsed_s3_log)) + pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) diff --git a/tests/test_parse_dandi_raw_s3_log.py b/tests/test_parse_dandi_raw_s3_log.py index 465a8ff..7225615 100644 --- a/tests/test_parse_dandi_raw_s3_log.py +++ b/tests/test_parse_dandi_raw_s3_log.py @@ -44,9 +44,9 @@ def test_parse_dandi_raw_s3_log_example_0(tmpdir: py.path.local) -> None: test_parsed_s3_log_file_path.stem in expected_asset_ids ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" - test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) expected_parsed_s3_log_file_path = ( expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" ) - expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) diff --git a/tests/test_parse_dandi_raw_s3_log_bad_lines.py b/tests/test_parse_dandi_raw_s3_log_bad_lines.py index 1e2189b..a9a6ff6 100644 --- a/tests/test_parse_dandi_raw_s3_log_bad_lines.py +++ b/tests/test_parse_dandi_raw_s3_log_bad_lines.py @@ -39,11 +39,11 @@ def test_parse_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local) -> None: test_parsed_s3_log_file_path.stem in expected_asset_ids ), f"Asset ID {test_parsed_s3_log_file_path.stem} not found in expected asset IDs!" - test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path, index_col=0) + test_parsed_s3_log = pandas.read_table(filepath_or_buffer=test_parsed_s3_log_file_path) expected_parsed_s3_log_file_path = ( expected_parsed_s3_log_folder_path / f"{test_parsed_s3_log_file_path.stem}.tsv" ) - expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path, index_col=0) + expected_parsed_s3_log = pandas.read_table(filepath_or_buffer=expected_parsed_s3_log_file_path) pandas.testing.assert_frame_equal(left=test_parsed_s3_log, right=expected_parsed_s3_log) post_test_error_folder_contents = list(error_folder.iterdir()) if error_folder.exists() else list()