Skip to content

Commit

Permalink
fix size issue
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 6, 2024
1 parent a8cc153 commit 05abb2c
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def parse_all_dandi_raw_s3_logs(
excluded_ips: collections.defaultdict[str, bool] | None = None,
exclude_github_ips: bool = True,
number_of_jobs: int = 1,
maximum_ram_usage_in_bytes: int = 10**9,
maximum_ram_usage_in_bytes: int = 4 * 10**9,
) -> None:
"""
Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files.
Expand Down Expand Up @@ -62,8 +62,8 @@ def parse_all_dandi_raw_s3_logs(
Allows negative range to mean 'all but this many (minus one) jobs'.
E.g., -1 means use all workers, -2 means all but one worker.
WARNING: planned but not yet supported.
maximum_ram_usage_in_bytes : int, default: 1 GB
The theoretical maximum amount of RAM (in bytes) to be used throughout the process.
maximum_ram_usage_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to be used across all the processes.
"""
base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
Expand Down Expand Up @@ -116,6 +116,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
per_job_temporary_folder_path = temporary_folder_path / f"job_{job_index}"
per_job_temporary_folder_path.mkdir(exist_ok=True)

maximum_ram_usage_in_bytes_per_job = maximum_ram_usage_in_bytes // number_of_jobs

futures = []
with ProcessPoolExecutor(max_workers=number_of_jobs) as executor:
for raw_s3_log_file_path in daily_raw_s3_log_file_paths:
Expand All @@ -129,7 +131,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
excluded_ips=excluded_ips,
exclude_github_ips=False, # Already included in list so avoid repeated construction
asset_id_handler=asset_id_handler,
maximum_ram_usage_in_bytes=int(maximum_ram_usage_in_bytes / number_of_jobs),
maximum_ram_usage_in_bytes=maximum_ram_usage_in_bytes_per_job,
)
)

Expand Down Expand Up @@ -187,7 +189,7 @@ def parse_dandi_raw_s3_log(
exclude_github_ips: bool = True,
asset_id_handler: Callable | None = None,
tqdm_kwargs: dict | None = None,
maximum_ram_usage_in_bytes: int = 40**9,
maximum_ram_usage_in_bytes: int = 4 * 10**9,
) -> None:
"""
Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
Expand Down Expand Up @@ -224,7 +226,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
return split_by_slash[0] + "_" + split_by_slash[-1]
tqdm_kwargs : dict, optional
Keyword arguments to pass to the tqdm progress bar.
maximum_ram_usage_in_bytes : int, default: 1 GB
maximum_ram_usage_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to be used throughout the process.
"""
tqdm_kwargs = tqdm_kwargs or dict()
Expand Down Expand Up @@ -268,7 +270,7 @@ def parse_raw_s3_log(
excluded_ips: collections.defaultdict[str, bool] | None = None,
asset_id_handler: Callable | None = None,
tqdm_kwargs: dict | None = None,
maximum_ram_usage_in_bytes: int = 40**9,
maximum_ram_usage_in_bytes: int = 4 * 10**9,
) -> None:
"""
Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
Expand Down Expand Up @@ -307,7 +309,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
return split_by_slash[0] + "_" + split_by_slash[-1]
tqdm_kwargs : dict, optional
Keyword arguments to pass to the tqdm progress bar.
maximum_ram_usage_in_bytes : int, default: 1 GB
maximum_ram_usage_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to be used throughout the process.
"""
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
Expand Down Expand Up @@ -369,7 +371,7 @@ def _get_reduced_log_lines(
request_type: Literal["GET", "PUT"],
excluded_ips: collections.defaultdict[str, bool],
tqdm_kwargs: dict | None = None,
maximum_ram_usage_in_bytes: int = 10**9,
maximum_ram_usage_in_bytes: int = 4 * 10**9,
) -> list[ReducedLogLine]:
"""
Reduce the full S3 log file to minimal content and return a list of in-memory collections.namedtuple objects.
Expand All @@ -386,7 +388,7 @@ def _get_reduced_log_lines(
A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing.
tqdm_kwargs : dict, optional
Keyword arguments to pass to the tqdm progress bar.
maximum_ram_usage_in_bytes : int, default: 1 GB
maximum_ram_usage_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to be used throughout the process.
"""
assert raw_s3_log_file_path.suffix == ".log", f"{raw_s3_log_file_path=} should end in '.log'!"
Expand Down

0 comments on commit 05abb2c

Please sign in to comment.