Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure process #29

Merged
merged 11 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Simple reductions of consolidated S3 logs (consolidation step not included in th

Developed for the [DANDI Archive](https://dandiarchive.org/).

A single line of a raw S3 log file can be between 400-1000+ bytes. Some of the busiest daily logs on the archive can have around 5,014,386 lines. As of summer 2024, there are more than 6 TB of log files collected.

This parser can reduce these to tens of GB of consolidated and anonymized usage data, which is much more manageable for sharing and plotting.



## Usage
Expand All @@ -29,6 +33,7 @@ To iteratively parse all historical logs all at once (parallelization with 10-15
parse_all_dandi_raw_s3_logs \
--base_raw_s3_log_folder_path < base log folder > \
--parsed_s3_log_folder_path < output folder > \
--excluded_log_files < any log files to skip> \
--excluded_ips < comma-separated list of known IPs to exclude > \
--maximum_number_of_workers < number of CPUs to use > \
--maximum_buffer_size_in_bytes < approximate amount of RAM to use >
Expand All @@ -40,6 +45,7 @@ For example, on Drogon:
parse_all_dandi_raw_s3_logs \
--base_raw_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs \
--parsed_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_7_13_2024/GET_per_asset_id \
--excluded_log_files /mnt/backup/dandi/dandiarchive-logs/stats/start-end.log \
--excluded_ips < Drogons IP > \
--maximum_number_of_workers 3 \
--maximum_buffer_size_in_bytes 15000000000
Expand Down
19 changes: 18 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"]

[project]
name = "dandi_s3_log_parser"
version="1.0.0"
version="0.1.0"
authors = [
{ name="Cody Baker", email="[email protected]" },
]
Expand Down Expand Up @@ -70,6 +70,23 @@ extend-exclude = '''
exclude = [
"*/__init__.py"
]
line-length = 120

[tool.ruff.lint]
select = ["F", "E", "I"]
ignore = [
"PTH123",
"D203",
"D212",
"T201",
"FIX002",
"TD003",
"TD002",
"S101",
"ICN001",
"INP001",
]
fixable = ["ALL"]

[tool.ruff.lint.isort]
relative-imports-order = "closest-to-furthest"
Expand Down
4 changes: 2 additions & 2 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH, IPINFO_CREDENTIALS, get_hash_salt
from ._s3_log_file_parser import parse_raw_s3_log
from ._buffered_text_reader import BufferedTextReader
from ._order_parsed_logs import order_parsed_logs
from ._order_and_anonymize_parsed_logs import order_and_anonymize_parsed_logs
from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs

__all__ = [
Expand All @@ -14,5 +14,5 @@
"parse_raw_s3_log",
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
"order_parsed_logs",
"order_and_anonymize_parsed_logs",
]
9 changes: 6 additions & 3 deletions src/dandi_s3_log_parser/_buffered_text_reader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pathlib
from typing import Self


class BufferedTextReader:
def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_bytes: int = 10**9):
def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_bytes: int = 10**9) -> None:
"""
Lazily read a text file into RAM using buffers of a specified size.

Expand All @@ -13,6 +14,7 @@ def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_byte
maximum_buffer_size_in_bytes : int, default: 1 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text file.

"""
self.file_path = file_path
self.maximum_buffer_size_in_bytes = maximum_buffer_size_in_bytes
Expand All @@ -25,7 +27,7 @@ def __init__(self, *, file_path: str | pathlib.Path, maximum_buffer_size_in_byte
self.number_of_buffers = int(self.total_file_size / self.buffer_size_in_bytes) + 1
self.offset = 0

def __iter__(self):
def __iter__(self) -> Self:
return self

def __next__(self) -> list[str]:
Expand All @@ -48,10 +50,11 @@ def __next__(self) -> list[str]:
last_line = split_intermediate_buffer[-1]

if len(buffer) == 0 and last_line != "":
raise ValueError(
message = (
f"BufferedTextReader encountered a line at offset {self.offset} that exceeds the buffer "
"size! Try increasing the `maximum_buffer_size_in_bytes` to account for this line."
)
raise ValueError(message)

# The last line split by the intermediate buffer may or may not be incomplete
if decoded_intermediate_buffer.endswith("\n"):
Expand Down
22 changes: 18 additions & 4 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
import collections
import os
import pathlib
import click
from typing import Literal

from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs
from .testing._helpers import find_random_example_line
import click

from ._config import REQUEST_TYPES
from ._dandi_s3_log_file_parser import (
parse_all_dandi_raw_s3_logs,
parse_dandi_raw_s3_log,
)
from .testing._helpers import find_random_example_line

NUMBER_OF_CPU = os.cpu_count() # Note: Not distinguishing if logical or not

Expand All @@ -26,6 +30,13 @@
required=True,
type=click.Path(writable=True),
)
@click.option(
"--excluded_log_files",
help="A comma-separated list of log files to exclude from parsing.",
required=False,
type=str,
default=None,
)
@click.option(
"--excluded_ips",
help="A comma-separated list of IP addresses to exclude from parsing.",
Expand Down Expand Up @@ -56,18 +67,21 @@
def parse_all_dandi_raw_s3_logs_cli(
base_raw_s3_log_folder_path: str,
parsed_s3_log_folder_path: str,
excluded_log_files: str | None,
excluded_ips: str | None,
maximum_number_of_workers: int,
maximum_buffer_size_in_bytes: int,
) -> None:
split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else []
split_excluded_log_files = excluded_log_files.split(",") if excluded_log_files is not None else list()
split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else list()
handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None
for excluded_ip in split_excluded_ips:
handled_excluded_ips[excluded_ip] = True

parse_all_dandi_raw_s3_logs(
base_raw_s3_log_folder_path=base_raw_s3_log_folder_path,
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
excluded_log_files=split_excluded_log_files,
excluded_ips=handled_excluded_ips,
maximum_number_of_workers=maximum_number_of_workers,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
Expand Down
8 changes: 4 additions & 4 deletions src/dandi_s3_log_parser/_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import hashlib
import os
import pathlib
import hashlib

REQUEST_TYPES = ("GET", "PUT", "HEAD")

Expand All @@ -13,15 +13,15 @@

if "IPINFO_CREDENTIALS" not in os.environ:
raise ValueError(
"The environment variable 'IPINFO_CREDENTIALS' must be set to import `dandi_s3_log_parser`!"
"The environment variable 'IPINFO_CREDENTIALS' must be set to import `dandi_s3_log_parser`!",
) # pragma: no cover
IPINFO_CREDENTIALS = os.environ["IPINFO_CREDENTIALS"]

if "IPINFO_HASH_SALT" not in os.environ:
raise ValueError(
"The environment variable 'IPINFO_HASH_SALT' must be set to import `dandi_s3_log_parser`! "
"To retrieve the value, set a temporary value to this environment variable and then use the `get_hash_salt` "
"helper function and set it to the correct value."
"helper function and set it to the correct value.",
) # pragma: no cover
IPINFO_HASH_SALT = bytes.fromhex(os.environ["IPINFO_HASH_SALT"])

Expand All @@ -39,7 +39,7 @@ def get_hash_salt(base_raw_s3_log_folder_path: str | pathlib.Path) -> str:
# Retrieve the first line of the first log file (which only we know) and use that as a secure salt
first_log_file_path = base_raw_s3_log_folder_path / "2019" / "10" / "01.log"

with open(file=first_log_file_path, mode="r") as io:
with open(file=first_log_file_path) as io:
first_line = io.readline()

hash_salt = hashlib.sha1(string=bytes(first_line, "utf-8"))
Expand Down
Loading