Skip to content

Commit

Permalink
various debugs and enhancements; more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 10, 2024
1 parent 7468313 commit e097e0d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 33 deletions.
16 changes: 16 additions & 0 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Call the raw S3 log parser from the command line."""

import collections
import os
import pathlib
import click
from typing import Literal
Expand All @@ -9,6 +10,8 @@
from .testing._helpers import find_random_example_line
from ._config import REQUEST_TYPES

NUMBER_OF_CPU = os.cpu_count() # Note: Not distinguishing if logical or not


@click.command(name="parse_all_dandi_raw_s3_logs")
@click.option(
Expand Down Expand Up @@ -45,12 +48,24 @@
type=str,
default=None,
)
@click.option(
"--number_of_jobs",
help="The number of jobs to use for parallel processing.",
required=False,
type=int,
default=1,
)
def parse_all_dandi_raw_s3_logs_cli(
base_raw_s3_log_folder_path: str,
parsed_s3_log_folder_path: str,
mode: Literal["w", "a"] = "a",
excluded_ips: str | None = None,
number_of_jobs: int = 1,
) -> None:
number_of_jobs = NUMBER_OF_CPU + number_of_jobs + 1 if number_of_jobs < 0 else number_of_jobs
assert number_of_jobs > 0, "The number of jobs must be greater than 0."
assert number_of_jobs <= NUMBER_OF_CPU, "The number of jobs must be less than or equal to the number of CPUs."

split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else []
handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None
for excluded_ip in split_excluded_ips:
Expand All @@ -61,6 +76,7 @@ def parse_all_dandi_raw_s3_logs_cli(
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
mode=mode,
excluded_ips=handled_excluded_ips,
number_of_jobs=number_of_jobs,
)


Expand Down
81 changes: 50 additions & 31 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import pathlib
import os
import shutil
import traceback
import uuid
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable, Literal
import importlib.metadata

import pandas
import tqdm
Expand Down Expand Up @@ -130,21 +132,19 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
raw_s3_log_file_path=raw_s3_log_file_path,
temporary_folder_path=temporary_folder_path,
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job,
)
)

# Perform the iteration to trigger processing
for _ in tqdm.tqdm(
progress_bar_iterable = tqdm.tqdm(
iterable=as_completed(futures),
desc=f"Parsing log files using {number_of_jobs} jobs...",
total=len(daily_raw_s3_log_file_paths),
position=0,
leave=True,
):
pass
)
for future in progress_bar_iterable:
future.result() # This is the call that finally triggers the deployment to the workers

print("\n\nParallel parsing complete!\n\n")

Expand All @@ -156,6 +156,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
leave=True,
):
per_job_parsed_s3_log_file_paths = list(per_job_temporary_folder_path.iterdir())
assert len(per_job_parsed_s3_log_file_paths) != 0, f"No files found in {per_job_temporary_folder_path}!"

for per_job_parsed_s3_log_file_path in tqdm.tqdm(
iterable=per_job_parsed_s3_log_file_paths,
desc="Merging results per job...",
Expand Down Expand Up @@ -192,25 +194,51 @@ def _multi_job_parse_dandi_raw_s3_log(
raw_s3_log_file_path: pathlib.Path,
temporary_folder_path: pathlib.Path,
excluded_ips: collections.defaultdict[str, bool] | None,
exclude_github_ips: bool,
asset_id_handler: Callable | None,
maximum_ram_usage_in_bytes: int,
) -> None:
"""A mostly pass-through function to calculate the job index on the worker and target the correct subfolder."""
job_index = os.getpid() % number_of_jobs
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
"""
A mostly pass-through function to calculate the job index on the worker and target the correct subfolder.
parse_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=per_job_temporary_folder_path,
mode="a",
excluded_ips=excluded_ips,
exclude_github_ips=exclude_github_ips,
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=job_index + 1, leave=False),
maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes,
order_results=False, # Always disable this for parallel processing
)
Also dumps error stack (which is only typically seen by the worker and not sent back to the main stdout pipe)
to a log file.
"""

try:
error_message = ""

def asset_id_handler(*, raw_asset_id: str) -> str:
"""Apparently callables, even simple built-in ones, cannot be pickled."""
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]

job_index = os.getpid() % number_of_jobs
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"

# Define error catching stuff as part of the try clause
# so that if there is a problem within that, it too is caught
errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors"
errors_folder_path.mkdir(exist_ok=True)

dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt"
error_message += f"Job index {job_index}/{number_of_jobs} parsing {raw_s3_log_file_path} failed due to\n\n"

parse_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=per_job_temporary_folder_path,
mode="a",
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=job_index + 1, leave=False),
maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes,
order_results=False, # Always disable this for parallel processing
)
except Exception as exception:
with open(file=parallel_errors_file_path, mode="a") as io:
error_message += f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}\n\n"
io.write(error_message)

return None

Expand Down Expand Up @@ -419,15 +447,6 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
header = False if parsed_s3_log_file_path.exists() is True and mode == "a" else True
data_frame.to_csv(path_or_buf=parsed_s3_log_file_path, mode=mode, sep="\t", header=header, index=False)

# Keep a log of progress
progress_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "progress"
progress_folder_path.mkdir(exist_ok=True)

date = datetime.datetime.now().strftime("%y%m%d")
progress_file_path = progress_folder_path / f"{date}.txt"
with open(file=progress_file_path, mode="a") as io:
io.write(f"Parsed {raw_s3_log_file_path} successfully!\n")

if order_results is True:
order_parsed_logs(
unordered_parsed_s3_log_folder_path=temporary_output_folder_path,
Expand Down
4 changes: 2 additions & 2 deletions src/dandi_s3_log_parser/_s3_log_line_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import datetime
import pathlib
import re
from importlib.metadata import version as importlib_version
import importlib.metadata

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._ip_utils import _get_region_from_ip_address
Expand Down Expand Up @@ -104,7 +104,7 @@ def _get_full_log_line(
errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors"
errors_folder_path.mkdir(exist_ok=True)

dandi_s3_log_parser_version = importlib_version(distribution_name="dandi_s3_log_parser")
dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_lines_errors.txt"

Expand Down

0 comments on commit e097e0d

Please sign in to comment.