Skip to content

Commit

Permalink
Better convenience (#36)
Browse files Browse the repository at this point in the history
* fixes

* swap CLI to MiB

* OK, MB is simpler

* actually apply correction

* write specific line errors to error collection file

* write specific line errors to error collection file

* add note

* add more to list of known requests

* limit task ID generation

* add newlines

* silly mistsake

* add note

* remove comment; use good values in README

* fixes

* align index on tqdm

* update recommendations; adjust line buffer tqdm

* try not messing with iters

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 14, 2024
1 parent b86be2f commit 93cec44
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 58 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ parse_all_dandi_raw_s3_logs \
--excluded_log_files < any log files to skip> \
--excluded_ips < comma-separated list of known IPs to exclude > \
--maximum_number_of_workers < number of CPUs to use > \
--maximum_buffer_size_in_bytes < approximate amount of RAM to use >
--maximum_buffer_size_in_mb < approximate amount of RAM to use >
```

For example, on Drogon:
Expand All @@ -55,8 +55,8 @@ parse_all_dandi_raw_s3_logs \
--parsed_s3_log_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_7_13_2024/GET_per_asset_id \
--excluded_log_files /mnt/backup/dandi/dandiarchive-logs/stats/start-end.log \
--excluded_ips < Drogons IP > \
--maximum_number_of_workers 3 \
--maximum_buffer_size_in_bytes 15000000000
--maximum_number_of_workers 6 \
--maximum_buffer_size_in_mb 10000
```

To parse only a single log file at a time, such as in a CRON job:
Expand Down
11 changes: 6 additions & 5 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,32 @@
default=1,
)
@click.option(
"--maximum_buffer_size_in_bytes",
"--maximum_buffer_size_in_MB",
help=""""
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
The theoretical maximum amount of RAM (in MB) to use on each buffer iteration when reading from the
source text files.
Actual total RAM usage will be higher due to overhead and caching.
Automatically splits this total amount over the maximum number of workers if `maximum_number_of_workers` is
greater than one.
""",
required=False,
type=click.IntRange(min=10**6), # Minimum of 1 MB
default=4 * 10**9,
type=click.IntRange(min=1), # Bare minimum of 1 MB
default=1_000, # 1 GB recommended
)
def parse_all_dandi_raw_s3_logs_cli(
base_raw_s3_log_folder_path: str,
parsed_s3_log_folder_path: str,
excluded_log_files: str | None,
excluded_ips: str | None,
maximum_number_of_workers: int,
maximum_buffer_size_in_bytes: int,
maximum_buffer_size_in_mb: int,
) -> None:
split_excluded_log_files = excluded_log_files.split(",") if excluded_log_files is not None else list()
split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else list()
handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None
for excluded_ip in split_excluded_ips:
handled_excluded_ips[excluded_ip] = True
maximum_buffer_size_in_bytes = maximum_buffer_size_in_mb * 10**6

parse_all_dandi_raw_s3_logs(
base_raw_s3_log_folder_path=base_raw_s3_log_folder_path,
Expand Down
69 changes: 38 additions & 31 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def parse_all_dandi_raw_s3_logs(
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)
else:
# Create a fresh temporary directory in the home folder and then fresh subfolders for each job
# Create a fresh temporary directory in the home folder and then fresh subfolders for each worker
temporary_base_folder_path = parsed_s3_log_folder_path / ".temp"
temporary_base_folder_path.mkdir(exist_ok=True)

Expand All @@ -103,61 +103,66 @@ def parse_all_dandi_raw_s3_logs(
temporary_folder_path = temporary_base_folder_path / task_id
temporary_folder_path.mkdir(exist_ok=True)

per_job_temporary_folder_paths = list()
for job_index in range(maximum_number_of_workers):
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
per_job_temporary_folder_path.mkdir(exist_ok=True)
per_job_temporary_folder_paths.append(per_job_temporary_folder_path)
per_worker_temporary_folder_paths = list()
for worker_index in range(maximum_number_of_workers):
per_worker_temporary_folder_path = temporary_folder_path / f"worker_{worker_index}"
per_worker_temporary_folder_path.mkdir(exist_ok=True)
per_worker_temporary_folder_paths.append(per_worker_temporary_folder_path)

maximum_buffer_size_in_bytes_per_job = maximum_buffer_size_in_bytes // maximum_number_of_workers
maximum_buffer_size_in_bytes_per_worker = maximum_buffer_size_in_bytes // maximum_number_of_workers

futures = []
with ProcessPoolExecutor(max_workers=maximum_number_of_workers) as executor:
for raw_s3_log_file_path in daily_raw_s3_log_file_paths:
futures.append(
executor.submit(
_multi_job_parse_dandi_raw_s3_log,
_multi_worker_parse_dandi_raw_s3_log,
maximum_number_of_workers=maximum_number_of_workers,
raw_s3_log_file_path=raw_s3_log_file_path,
temporary_folder_path=temporary_folder_path,
excluded_ips=excluded_ips,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_job,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_worker,
),
)

progress_bar_iterable = tqdm.tqdm(
iterable=as_completed(futures),
desc=f"Parsing log files using {maximum_number_of_workers} jobs...",
desc=f"Parsing log files using {maximum_number_of_workers} workers...",
total=len(daily_raw_s3_log_file_paths),
position=0,
leave=True,
mininterval=2.0,
smoothing=0, # Use true historical average, not moving average since shuffling makes it more uniform
)
for future in progress_bar_iterable:
future.result() # This is the call that finally triggers the deployment to the workers

print("\n\nParallel parsing complete!\n\n")

for per_job_temporary_folder_path in tqdm.tqdm(
iterable=per_job_temporary_folder_paths,
desc="Merging results across jobs...",
total=len(per_job_temporary_folder_paths),
for per_worker_temporary_folder_path in tqdm.tqdm(
iterable=per_worker_temporary_folder_paths,
desc="Merging results across workers...",
total=len(per_worker_temporary_folder_paths),
position=0,
leave=True,
mininterval=2.0,
):
per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir())
assert len(per_job_parsed_s3_log_file_paths) != 0, f"No files found in {per_job_temporary_folder_path}!"

for per_job_parsed_s3_log_file_path in tqdm.tqdm(
iterable=per_job_parsed_s3_log_file_paths,
desc="Merging results per job...",
total=len(per_job_parsed_s3_log_file_paths),
per_worker_parsed_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
assert (
len(per_worker_parsed_s3_log_file_paths) != 0
), f"No files found in {per_worker_temporary_folder_path}!"

for per_worker_parsed_s3_log_file_path in tqdm.tqdm(
iterable=per_worker_parsed_s3_log_file_paths,
desc="Merging results per worker...",
total=len(per_worker_parsed_s3_log_file_paths),
position=1,
leave=False,
mininterval=3.0,
mininterval=2.0,
):
merged_temporary_file_path = parsed_s3_log_folder_path / per_job_parsed_s3_log_file_path.name
merged_temporary_file_path = parsed_s3_log_folder_path / per_worker_parsed_s3_log_file_path.name

parsed_s3_log = pandas.read_table(filepath_or_buffer=per_job_parsed_s3_log_file_path, header=0)
parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_parsed_s3_log_file_path, header=0)

header = False if merged_temporary_file_path.exists() else True
parsed_s3_log.to_csv(
Expand All @@ -173,7 +178,7 @@ def parse_all_dandi_raw_s3_logs(

# Function cannot be covered because the line calls occur on subprocesses
# pragma: no cover
def _multi_job_parse_dandi_raw_s3_log(
def _multi_worker_parse_dandi_raw_s3_log(
*,
maximum_number_of_workers: int,
raw_s3_log_file_path: pathlib.Path,
Expand All @@ -182,7 +187,7 @@ def _multi_job_parse_dandi_raw_s3_log(
maximum_buffer_size_in_bytes: int,
) -> None:
"""
A mostly pass-through function to calculate the job index on the worker and target the correct subfolder.
A mostly pass-through function to calculate the worker index on the worker and target the correct subfolder.
Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe)
to a log file.
Expand All @@ -192,8 +197,8 @@ def _multi_job_parse_dandi_raw_s3_log(

asset_id_handler = _get_default_dandi_asset_id_handler()

job_index = os.getpid() % maximum_number_of_workers
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
worker_index = os.getpid() % maximum_number_of_workers
per_worker_temporary_folder_path = temporary_folder_path / f"worker_{worker_index}"

# Define error catching stuff as part of the try clause
# so that if there is a problem within that, it too is caught
Expand All @@ -204,16 +209,18 @@ def _multi_job_parse_dandi_raw_s3_log(
date = datetime.datetime.now().strftime("%y%m%d")
parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt"
error_message += (
f"Job index {job_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n"
f"Worker index {worker_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n"
)

parse_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=per_job_temporary_folder_path,
parsed_s3_log_folder_path=per_worker_temporary_folder_path,
mode="a",
excluded_ips=excluded_ips,
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=job_index + 1, leave=False),
tqdm_kwargs=dict(
position=worker_index + 1, leave=False, desc=f"Parsing line buffers on worker {worker_index+1}..."
),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)
except Exception as exception:
Expand Down
8 changes: 5 additions & 3 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import collections
import pathlib
import uuid
from collections.abc import Callable
from typing import Literal

Expand Down Expand Up @@ -136,9 +137,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
A map of all reduced log line content binned by handled asset ID.
"""
tqdm_kwargs = tqdm_kwargs or dict()

# Perform I/O read in batches to improve performance
resolved_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False, mininterval=5.0)
default_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False)
resolved_tqdm_kwargs = dict(default_tqdm_kwargs)
resolved_tqdm_kwargs.update(tqdm_kwargs)

reduced_and_binned_logs = collections.defaultdict(list)
Expand All @@ -152,6 +152,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
**resolved_tqdm_kwargs,
)

task_id = str(uuid.uuid4())[:5]
per_buffer_index = 0
for buffered_raw_lines in progress_bar_iterator:
for index, raw_line in enumerate(buffered_raw_lines):
Expand All @@ -166,6 +167,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
excluded_ips=excluded_ips,
log_file_path=raw_s3_log_file_path,
line_index=line_index,
task_id=task_id,
)
per_buffer_index += index

Expand Down
59 changes: 43 additions & 16 deletions src/dandi_s3_log_parser/_s3_log_line_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,20 @@

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH

_KNOWN_REQUEST_TYPES = collections.defaultdict(bool)
for request_type in ["GET", "PUT", "HEAD"]:
_KNOWN_REQUEST_TYPES[request_type] = True
# Known forms:
# REST.GET.OBJECT
# REST.PUT.OBJECT
# REST.HEAD.OBJECT
# REST.POST.OBJECT
# REST.DELETE.OBJECT
# REST.OPTIONS.PREFLIGHT
# BATCH.DELETE.OBJECT
# Longer names are truncated for lower data overhead via direct slicing based on known lengths and separator locations
_KNOWN_REQUEST_TYPES = ["GET", "PUT", "HEAD", "POST", "DELE", "OPTI", ".DEL"]

_IS_REQUEST_TYPE_KNOWN = collections.defaultdict(bool)
for request_type in _KNOWN_REQUEST_TYPES:
_IS_REQUEST_TYPE_KNOWN[request_type] = True

_FULL_PATTERN_TO_FIELD_MAPPING = [
"bucket_owner",
Expand Down Expand Up @@ -70,9 +81,10 @@ def _append_reduced_log_line(
excluded_ips: collections.defaultdict[str, bool],
line_index: int,
log_file_path: pathlib.Path,
task_id: str,
) -> None:
"""
Append the `reduced_and_binned_logs` map with informatione extracted from a single raw log line, if it is valid.
Append the `reduced_and_binned_logs` map with information extracted from a single raw log line, if it is valid.
Parameters
----------
Expand All @@ -97,7 +109,9 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
line_index: int
The index of the line in the raw log file.
log_file_path: pathlib.Path
The path to the log file being parsed; attached for logging purposes.
The path to the log file being parsed; attached for error collection purposes.
task_id: str
A unique task ID to ensure that error collection files are unique when parallelizing to avoid race conditions.
"""
parsed_log_line = _parse_s3_log_line(raw_line=raw_line)

Expand All @@ -106,6 +120,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
log_file_path=log_file_path,
line_index=line_index,
raw_line=raw_line,
task_id=task_id,
)

if full_log_line is None:
Expand All @@ -115,26 +130,37 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
if full_log_line.bucket != bucket:
return

# Raise some quick parsing errors if anything indicates an improper parsing
# Apply some minimal validation and contribute any invalidations to error collection
# These might slow parsing down a bit, but could be important to ensuring accuracy
errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors"
errors_folder_path.mkdir(exist_ok=True)

dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_line_errors_{task_id}.txt"

if not full_log_line.status_code.isdigit():
message = f"Unexpected status code: '{full_log_line.status_code}' on line {line_index} of file {log_file_path}."
raise ValueError(message)
message = (
f"Unexpected status code: '{full_log_line.status_code}' on line {line_index} of file {log_file_path}.\n\n"
)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)

# An expected operation string is "REST.GET.OBJECT"
operation_slice = slice(5, 8) if full_log_line.operation[8] == "." else slice(5, 9)
handled_request_type = full_log_line.operation[operation_slice]
if _KNOWN_REQUEST_TYPES[handled_request_type] is False:
if _IS_REQUEST_TYPE_KNOWN[handled_request_type] is False:
message = (
f"Unexpected request type: '{handled_request_type}' handled from '{full_log_line.operation}' "
f"on line {line_index} of file {log_file_path}."
f"on line {line_index} of file {log_file_path}.\n\n"
)
raise ValueError(message)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)

timezone = full_log_line.timestamp[-5:] != "+0000"
if timezone:
message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`."
raise ValueError(message)
message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`.\n\n"
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)

# More early skip conditions
# Only accept 200-block status codes
Expand Down Expand Up @@ -165,7 +191,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
def _find_all_possible_substring_indices(*, string: str, substring: str) -> list[int]:
indices = list()
start = 0
max_iter = 1000
max_iter = 10**6
while True and start < max_iter:
next_index = string.find(substring, start)
if next_index == -1: # .find(...) was unable to locate the substring
Expand Down Expand Up @@ -234,6 +260,7 @@ def _get_full_log_line(
log_file_path: pathlib.Path,
line_index: int,
raw_line: str,
task_id: str,
) -> _FullLogLine | None:
"""Construct a FullLogLine from a single parsed log line, or dump to error collection file and return None."""
full_log_line = None
Expand All @@ -259,7 +286,7 @@ def _get_full_log_line(

dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_lines_errors.txt"
lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_line_errors_{task_id}.txt"

# TODO: automatically attempt to anonymize any detectable IP address in the raw line by replacing with 192.0.2.0
with open(file=lines_errors_file_path, mode="a") as io:
Expand Down

0 comments on commit 93cec44

Please sign in to comment.