Skip to content

Commit

Permalink
Add Dandiset mapping tool (#40)
Browse files Browse the repository at this point in the history
* added mappers and tests

* debugging

* force index alignment in parallel test

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 16, 2024
1 parent 6fbbb74 commit 516a7d0
Show file tree
Hide file tree
Showing 21 changed files with 308 additions and 226 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ For example, on Drogon:
parse_all_dandi_raw_s3_logs \
--base_raw_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs \
--parsed_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \
--excluded_log_files /mnt/backup/dandi/dandiarchive-logs/stats/start-end.log \
--excluded_log_files /mnt/backup/dandi/dandiarchive-logs/stats/start-end.log,/mnt/backup/dandi/dandiarchive-logs/2024/05/.git/gc.log \
--excluded_ips < Drogons IP > \
--maximum_number_of_workers 6 \
--maximum_buffer_size_in_mb 5000
Expand Down
12 changes: 5 additions & 7 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._s3_log_file_parser import parse_raw_s3_log
from ._buffered_text_reader import BufferedTextReader
from ._order_and_anonymize_parsed_logs import order_and_anonymize_parsed_logs
from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._ip_utils import get_hash_salt
from ._log_utils import find_all_known_operation_types
from ._ip_utils import get_region_from_ip_address
from ._dandiset_mapper import map_reduced_logs_to_all_dandisets

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
"BufferedTextReader",
"get_hash_salt",
"parse_raw_s3_log",
"BufferedTextReader",
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
"order_and_anonymize_parsed_logs",
"find_all_known_operation_types",
"get_region_from_ip_address",
"map_reduced_logs_to_all_dandisets",
]
96 changes: 96 additions & 0 deletions src/dandi_s3_log_parser/_dandiset_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
import pathlib

import dandi.dandiapi
import pandas
import tqdm
from pydantic import DirectoryPath, validate_call

from ._ip_utils import _load_ip_hash_to_region_cache, get_region_from_ip_address


@validate_call
def map_reduced_logs_to_all_dandisets(
reduced_s3_log_folder_path: DirectoryPath, all_dandiset_logs_folder_path: DirectoryPath
) -> None:
if "IPINFO_CREDENTIALS" not in os.environ:
message = "The environment variable 'IPINFO_CREDENTIALS' must be set to import `dandi_s3_log_parser`!"
raise ValueError(message) # pragma: no cover

if "IP_HASH_SALT" not in os.environ:
message = (
"The environment variable 'IP_HASH_SALT' must be set to import `dandi_s3_log_parser`! "
"To retrieve the value, set a temporary value to this environment variable "
"and then use the `get_hash_salt` helper function and set it to the correct value."
)
raise ValueError(message) # pragma: no cover

client = dandi.dandiapi.DandiAPIClient()

ip_hash_to_region = _load_ip_hash_to_region_cache()
current_dandisets = list(client.get_dandisets())
for dandiset in tqdm.tqdm(
iterable=current_dandisets,
total=len(current_dandisets),
desc="Mapping reduced logs to Dandisets...",
position=0,
mininterval=5.0,
smoothing=0,
):
_map_reduced_logs_to_dandiset(
dandiset=dandiset,
reduced_s3_log_folder_path=reduced_s3_log_folder_path,
all_dandiset_logs_folder_path=all_dandiset_logs_folder_path,
client=client,
ip_hash_to_region=ip_hash_to_region,
)


def _map_reduced_logs_to_dandiset(
dandiset: dandi.dandiapi.RemoteDandiset,
reduced_s3_log_folder_path: pathlib.Path,
all_dandiset_logs_folder_path: pathlib.Path,
client: dandi.dandiapi.DandiAPIClient,
ip_hash_to_region: dict[str, str],
) -> None:
dandiset_id = dandiset.identifier

dandiset_log_folder_path = all_dandiset_logs_folder_path / dandiset_id

for version in dandiset.get_versions():
version_id = version.identifier

dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)

all_reduced_logs = []
for asset in dandiset_version.get_assets():
asset_id = asset.identifier
asset_suffixes = pathlib.Path(asset.path).suffixes

blob_or_zarr = "blobs" if ".zarr" not in asset_suffixes else "zarr"

reduced_log_file_path = reduced_s3_log_folder_path / f"{blob_or_zarr}_{asset_id}.tsv"

if not reduced_log_file_path.exists():
continue # No reduced logs found (possible asset was never accessed); skip to next asset

reduced_log = pandas.read_table(filepath_or_buffer=reduced_log_file_path, header=0)
reduced_log["asset_id"] = [asset_id] * len(reduced_log)
reduced_log["region"] = [
get_region_from_ip_address(ip_address=ip_address, ip_hash_to_region=ip_hash_to_region)
for ip_address in reduced_log["ip_address"]
]

reordered_reduced_log = reduced_log.reindex(columns=("asset_id", "timestamp", "bytes_sent", "region"))
all_reduced_logs.append(reordered_reduced_log)

if len(all_reduced_logs) == 0:
continue # No reduced logs found (possible dandiset version was never accessed); skip to next version

mapped_log = pandas.concat(objs=all_reduced_logs, ignore_index=True)
mapped_log.sort_values(by="timestamp")
mapped_log.index = range(len(mapped_log))

dandiset_log_folder_path.mkdir(exist_ok=True)
version_file_path = dandiset_log_folder_path / f"{version_id}.tsv"
mapped_log.to_csv(version_file_path, mode="w", sep="\t", header=True, index=True)
138 changes: 58 additions & 80 deletions src/dandi_s3_log_parser/_ip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import importlib.metadata
import ipaddress
import os
import pathlib
import traceback

import ipinfo
Expand All @@ -19,85 +18,7 @@
)


def get_hash_salt(base_raw_s3_log_folder_path: FilePath) -> str:
"""
Calculate the salt (in hexadecimal encoding) used for IP hashing.
Uses actual data from the first line of the first log file in the raw S3 log folder, which only we have access to.
Otherwise, it would be fairly easy to iterate over every possible IP address and find the SHA1 of it.
"""
base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path)

# Retrieve the first line of the first log file (which only we know) and use that as a secure salt
first_log_file_path = base_raw_s3_log_folder_path / "2019" / "10" / "01.log"

with open(file=first_log_file_path) as io:
first_line = io.readline()

hash_salt = hashlib.sha1(string=bytes(first_line, "utf-8"))

return hash_salt.hexdigest()


def _cidr_address_to_ip_range(*, cidr_address: str) -> list[str]:
"""Convert a CIDR address to a list of IP addresses."""
cidr_address_class = type(ipaddress.ip_address(cidr_address.split("/")[0]))
ip_address_range = []
if cidr_address_class is ipaddress.IPv4Address:
ip_address_range = ipaddress.IPv4Network(address=cidr_address)
elif cidr_address_class is ipaddress.IPv6Address: # pragma: no cover
ip_address_range = ipaddress.IPv6Network(address=cidr_address)

return [str(ip_address) for ip_address in ip_address_range]


def _get_latest_github_ip_ranges() -> list[str]:
"""Retrieve the latest GitHub CIDR ranges from their API and expand them into a list of IP addresses."""
github_ip_request = requests.get("https://api.github.com/meta").json()

skip_keys = ["domains", "ssh_key_fingerprints", "verifiable_password_authentication", "ssh_keys"]
keys = set(github_ip_request.keys()) - set(skip_keys)
github_cidr_addresses = [
cidr_address for key in keys for cidr_address in github_ip_request[key] if "::" not in cidr_address # Skip IPv6
]

all_github_ips = [
str(ip_address)
for cidr_address in github_cidr_addresses
for ip_address in _cidr_address_to_ip_range(cidr_address=cidr_address)
]

return all_github_ips


def _load_ip_hash_to_region_cache() -> dict[str, str]:
"""Load the IP hash to region cache from disk."""
if not _IP_HASH_TO_REGION_FILE_PATH.exists():
return {} # pragma: no cover

with open(file=_IP_HASH_TO_REGION_FILE_PATH) as stream:
return yaml.load(stream=stream, Loader=yaml.SafeLoader)


def _save_ip_hash_to_region_cache(*, ip_hash_to_region: dict[str, str]) -> None:
"""Save the IP hash to region cache to disk."""
with open(file=_IP_HASH_TO_REGION_FILE_PATH, mode="w") as stream:
yaml.dump(data=ip_hash_to_region, stream=stream)


def _save_ip_address_to_region_cache(
ip_hash_to_region: dict[str, str],
ip_hash_to_region_file_path: FilePath | None = None,
) -> None:
"""Save the IP address to region cache to disk."""
ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH

with open(file=ip_hash_to_region_file_path, mode="w") as stream:
yaml.dump(data=ip_hash_to_region, stream=stream)


def _get_region_from_ip_address(ip_address: str, ip_hash_to_region: dict[str, str]) -> str | None:
def get_region_from_ip_address(ip_address: str, ip_hash_to_region: dict[str, str]) -> str | None:
"""
If the parsed S3 logs are meant to be shared openly, the remote IP could be used to directly identify individuals.
Expand Down Expand Up @@ -168,3 +89,60 @@ def _get_region_from_ip_address(ip_address: str, ip_hash_to_region: dict[str, st
)

return "unknown"


def _cidr_address_to_ip_range(*, cidr_address: str) -> list[str]:
"""Convert a CIDR address to a list of IP addresses."""
cidr_address_class = type(ipaddress.ip_address(cidr_address.split("/")[0]))
ip_address_range = []
if cidr_address_class is ipaddress.IPv4Address:
ip_address_range = ipaddress.IPv4Network(address=cidr_address)
elif cidr_address_class is ipaddress.IPv6Address: # pragma: no cover
ip_address_range = ipaddress.IPv6Network(address=cidr_address)

return [str(ip_address) for ip_address in ip_address_range]


def _get_latest_github_ip_ranges() -> list[str]:
"""Retrieve the latest GitHub CIDR ranges from their API and expand them into a list of IP addresses."""
github_ip_request = requests.get("https://api.github.com/meta").json()

skip_keys = ["domains", "ssh_key_fingerprints", "verifiable_password_authentication", "ssh_keys"]
keys = set(github_ip_request.keys()) - set(skip_keys)
github_cidr_addresses = [
cidr_address for key in keys for cidr_address in github_ip_request[key] if "::" not in cidr_address # Skip IPv6
]

all_github_ips = [
str(ip_address)
for cidr_address in github_cidr_addresses
for ip_address in _cidr_address_to_ip_range(cidr_address=cidr_address)
]

return all_github_ips


def _load_ip_hash_to_region_cache() -> dict[str, str]:
"""Load the IP hash to region cache from disk."""
if not _IP_HASH_TO_REGION_FILE_PATH.exists():
return {} # pragma: no cover

with open(file=_IP_HASH_TO_REGION_FILE_PATH) as stream:
return yaml.load(stream=stream, Loader=yaml.SafeLoader)


def _save_ip_hash_to_region_cache(*, ip_hash_to_region: dict[str, str]) -> None:
"""Save the IP hash to region cache to disk."""
with open(file=_IP_HASH_TO_REGION_FILE_PATH, mode="w") as stream:
yaml.dump(data=ip_hash_to_region, stream=stream)


def _save_ip_address_to_region_cache(
ip_hash_to_region: dict[str, str],
ip_hash_to_region_file_path: FilePath | None = None,
) -> None:
"""Save the IP address to region cache to disk."""
ip_hash_to_region_file_path = ip_hash_to_region_file_path or _IP_HASH_TO_REGION_FILE_PATH

with open(file=ip_hash_to_region_file_path, mode="w") as stream:
yaml.dump(data=ip_hash_to_region, stream=stream)
39 changes: 0 additions & 39 deletions src/dandi_s3_log_parser/_log_utils.py

This file was deleted.

48 changes: 0 additions & 48 deletions src/dandi_s3_log_parser/_order_and_anonymize_parsed_logs.py

This file was deleted.

Loading

0 comments on commit 516a7d0

Please sign in to comment.