Skip to content

Commit

Permalink
Improvements to summaries (#68)
Browse files Browse the repository at this point in the history
* test binning skips

* debugging

* no fix needed

* add more summaries

* fix tests

* add region aggregation

* add subregions to services
  • Loading branch information
CodyCBakerPhD authored Sep 12, 2024
1 parent b739f3f commit b59ae1d
Show file tree
Hide file tree
Showing 50 changed files with 309 additions and 95 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ The process is designed to be easily parallelized and interruptible, meaning tha

### 2. **Binning**

To make the mapping to Dandisets more efficient, the reduced logs are binned by their object keys (asset blob IDs) for fast lookup.
To make the mapping to Dandisets more efficient, the reduced logs are binned by their object keys (asset blob IDs) for fast lookup. Zarr assets specifically group by the parent blob ID, *e.g.*, a request for `zarr/abcdefg/group1/dataset1/0` will be binned by `zarr/abcdefg`.

This step reduces the total file sizes from step (1) even further by reducing repeated object keys, though it does create a large number of small files.

Expand Down Expand Up @@ -126,7 +126,6 @@ The `--file_processing_limit < integer >` flag can be used to limit the number o
bin_all_reduced_s3_logs_by_object_key \
--reduced_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-reduced \
--binned_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-binned \
--file_limit 20
```

In the summer of 2024, this process took less than 5 hours to bin all 170 GB of reduced logs into the 80 GB of data per object key.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"]

[project]
name = "dandi_s3_log_parser"
version="0.4.0"
version="0.4.1"
authors = [
{ name="Cody Baker", email="[email protected]" },
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ def bin_all_reduced_s3_logs_by_object_key(
for reduced_s3_log_file in tqdm.tqdm(
iterable=reduced_s3_log_files,
total=len(reduced_s3_log_files),
desc="Binning reduced logs...",
desc="Binning reduced logs",
position=0,
leave=True,
mininterval=3.0,
smoothing=0,
unit="file",
):
if reduced_s3_log_file.stat().st_size == 0:
with open(file=started_tracking_file_path, mode="a") as io:
Expand Down Expand Up @@ -93,11 +94,12 @@ def bin_all_reduced_s3_logs_by_object_key(
for object_key, data in tqdm.tqdm(
iterable=object_keys_to_data.items(),
total=len(object_keys_to_data),
desc="Writing binned logs...",
desc=f"Binning {reduced_s3_log_file}",
position=1,
leave=False,
mininterval=3.0,
smoothing=0,
unit="asset",
):
object_key_as_path = pathlib.Path(object_key)
binned_s3_log_file_path = (
Expand Down
10 changes: 7 additions & 3 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ def reduce_all_dandi_raw_s3_logs(
fields_to_reduce = ["object_key", "timestamp", "bytes_sent", "ip_address"]
object_key_parents_to_reduce = ["blobs", "zarr"]
line_buffer_tqdm_kwargs = dict(position=1, leave=False)
# TODO: add better reporting units to all TQDMs (lines / s, files / s, etc.)
if maximum_number_of_workers == 1:
for relative_s3_log_file_path in tqdm.tqdm(
iterable=relative_s3_log_file_paths_to_reduce,
total=len(relative_s3_log_file_paths_to_reduce),
desc="Parsing log files...",
desc="Parsing log files",
position=0,
leave=True,
smoothing=0, # Use true historical average, not moving average since shuffling makes it more uniform
unit="file",
):
raw_s3_log_file_path = raw_s3_logs_folder_path / relative_s3_log_file_path
reduced_s3_log_file_path = (
Expand Down Expand Up @@ -144,6 +144,7 @@ def reduce_all_dandi_raw_s3_logs(
leave=True,
mininterval=3.0,
smoothing=0, # Use true historical average, not moving average since shuffling makes it more uniform
unit="file",
)
for future in progress_bar_iterable:
future.result() # This is the call that finally triggers the deployment to the workers
Expand Down Expand Up @@ -177,7 +178,10 @@ def _multi_worker_reduce_dandi_raw_s3_log(
object_key_parents_to_reduce = ["blobs", "zarr"]
object_key_handler = _get_default_dandi_object_key_handler()
line_buffer_tqdm_kwargs = dict(
position=worker_index + 1, leave=False, desc=f"Parsing line buffers on worker {worker_index + 1}..."
position=worker_index + 1,
leave=False,
desc=f"Parsing line buffers on worker {worker_index + 1}...",
unit="buffer",
)

reduce_raw_s3_log(
Expand Down
101 changes: 69 additions & 32 deletions src/dandi_s3_log_parser/_ip_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,39 @@ def get_region_from_ip_address(
raise ValueError(message) # pragma: no cover
ip_hash_salt = bytes.fromhex(os.environ["IP_HASH_SALT"])

# Probably a legitimate user, so fetch the geographic region
# Hash for anonymization within the cache
ip_hash = hashlib.sha1(string=bytes(ip_address, "utf-8") + ip_hash_salt).hexdigest()

# Early return for speed
# Early return from the cache for faster performance
lookup_result = ip_hash_to_region.get(ip_hash, None)
if lookup_result is not None:
return lookup_result

# Determine if IP address belongs to GitHub, AWS, Google, or known VPNs
# Azure not yet easily doable; keep an eye on
# https://learn.microsoft.com/en-us/answers/questions/1410071/up-to-date-azure-public-api-to-get-azure-ip-ranges
# and others, maybe it will change in the future
# maybe it will change in the future
if ip_hash_not_in_services.get(ip_hash, None) is None:
for service_name in _KNOWN_SERVICES:
cidr_addresses = _get_cidr_address_ranges(service_name=service_name)

if any(
ipaddress.ip_address(address=ip_address) in ipaddress.ip_network(address=cidr_address)
for cidr_address in cidr_addresses
):
ip_hash_to_region[ip_hash] = service_name
return service_name
cidr_addresses_and_subregions = _get_cidr_address_ranges_and_subregions(service_name=service_name)

matched_cidr_address_and_subregion = next(
(
(cidr_address, subregion)
for cidr_address, subregion in cidr_addresses_and_subregions
if ipaddress.ip_address(address=ip_address) in ipaddress.ip_network(address=cidr_address)
),
None,
)
if matched_cidr_address_and_subregion is not None:
region_service_string = service_name

subregion = matched_cidr_address_and_subregion[1]
if subregion is not None:
region_service_string += f"/{subregion}"

ip_hash_to_region[ip_hash] = region_service_string
return region_service_string
ip_hash_not_in_services[ip_hash] = True

# Log errors in IP fetching
Expand Down Expand Up @@ -105,49 +116,75 @@ def get_region_from_ip_address(


@functools.lru_cache
def _get_cidr_address_ranges(*, service_name: str) -> list[str]:
def _get_cidr_address_ranges_and_subregions(*, service_name: str) -> list[tuple[str, str | None]]:
cidr_request = _request_cidr_range(service_name=service_name)
match service_name:
case "GitHub":
github_cidr_request = requests.get(url="https://api.github.com/meta").json()
skip_keys = ["domains", "ssh_key_fingerprints", "verifiable_password_authentication", "ssh_keys"]
keys = set(github_cidr_request.keys()) - set(skip_keys)
github_cidr_addresses = [
cidr_address
keys = set(cidr_request.keys()) - set(skip_keys)
github_cidr_addresses_and_subregions = [
(cidr_address, None)
for key in keys
for cidr_address in github_cidr_request[key]
for cidr_address in cidr_request[key]
if "::" not in cidr_address
# Skip IPv6
]

return github_cidr_addresses
return github_cidr_addresses_and_subregions
# Note: these endpoints also return the 'locations' of the specific subnet, such as 'us-east-2'
case "AWS":
aws_cidr_request = requests.get(url="https://ip-ranges.amazonaws.com/ip-ranges.json").json()
aws_cidr_addresses = [prefix["ip_prefix"] for prefix in aws_cidr_request["prefixes"]]
aws_cidr_addresses_and_subregions = [
(prefix["ip_prefix"], prefix.get("region", None)) for prefix in cidr_request["prefixes"]
]

return aws_cidr_addresses
return aws_cidr_addresses_and_subregions
case "GCP":
gcp_cidr_request = requests.get(url="https://www.gstatic.com/ipranges/cloud.json").json()
gcp_cidr_addresses = [
prefix["ipv4Prefix"]
for prefix in gcp_cidr_request["prefixes"]
gcp_cidr_addresses_and_subregions = [
(prefix["ipv4Prefix"], prefix.get("scope", None))
for prefix in cidr_request["prefixes"]
if "ipv4Prefix" in prefix # Not handling IPv6 yet
]

return gcp_cidr_addresses
return gcp_cidr_addresses_and_subregions
case "Azure":
raise NotImplementedError("Azure CIDR address fetching is not yet implemented!") # pragma: no cover
case "VPN":
vpn_cidr_addresses_and_subregions = [(cidr_address, None) for cidr_address in cidr_request]

return vpn_cidr_addresses_and_subregions
case _:
raise ValueError(f"Service name '{service_name}' is not supported!") # pragma: no cover


@functools.lru_cache
def _request_cidr_range(service_name: str) -> dict:
"""Cache (in-memory) the requests to external services."""
match service_name:
case "GitHub":
github_cidr_request = requests.get(url="https://api.github.com/meta").json()

return github_cidr_request
case "AWS":
aws_cidr_request = requests.get(url="https://ip-ranges.amazonaws.com/ip-ranges.json").json()

return aws_cidr_request
case "GCP":
gcp_cidr_request = requests.get(url="https://www.gstatic.com/ipranges/cloud.json").json()

return gcp_cidr_request
case "Azure":
raise NotImplementedError("Azure CIDR address fetching is not yet implemented!")
case "VPN":
# Very nice public and maintained listing! Hope this stays stable.
vpn_cidr_addresses = (
vpn_cidr_request = (
requests.get(
url="https://raw.githubusercontent.com/josephrocca/is-vpn/main/vpn-or-datacenter-ipv4-ranges.txt"
)
.content.decode("utf-8")
.splitlines()
)

return vpn_cidr_addresses
return vpn_cidr_request
case _:
raise ValueError(f"Service name '{service_name}' is not supported!") # pragma: no cover

Expand All @@ -157,16 +194,16 @@ def _load_ip_hash_cache(*, name: Literal["region", "services"]) -> dict[str, str
match name:
case "region":
if not _IP_HASH_TO_REGION_FILE_PATH.exists():
return {} # pragma: no cover
return dict() # pragma: no cover

with open(file=_IP_HASH_TO_REGION_FILE_PATH) as stream:
return yaml.load(stream=stream, Loader=yaml.SafeLoader)
return yaml.load(stream=stream, Loader=yaml.SafeLoader) or dict()
case "services":
if not _IP_HASH_NOT_IN_SERVICES_FILE_PATH.exists():
return {} # pragma: no cover
return dict() # pragma: no cover

with open(file=_IP_HASH_NOT_IN_SERVICES_FILE_PATH) as stream:
return yaml.load(stream=stream, Loader=yaml.SafeLoader)
return yaml.load(stream=stream, Loader=yaml.SafeLoader) or dict()
case _:
raise ValueError(f"Name '{name}' is not recognized!") # pragma: no cover

Expand Down
Loading

0 comments on commit b59ae1d

Please sign in to comment.