Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 16, 2024
1 parent 7be2404 commit aac39d1
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 28 deletions.
32 changes: 15 additions & 17 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def reduce_all_dandi_raw_s3_logs(
base_raw_s3_logs_folder_path : file path
The Path to the folder containing the raw S3 log files to be reduced.
reduced_s3_logs_folder_path : file path
The Path to write each parsed S3 log file to.
The Path to write each reduced S3 log file to.
There will be one file per handled asset ID.
maximum_number_of_workers : int, default: 1
The maximum number of workers to distribute tasks across.
Expand All @@ -61,8 +61,6 @@ def reduce_all_dandi_raw_s3_logs(
excluded_ips : collections.defaultdict(bool), optional
A lookup table whose keys are IP addresses to exclude from reduction.
"""
base_raw_s3_logs_folder_path = pathlib.Path(base_raw_s3_logs_folder_path)
reduced_s3_logs_folder_path = pathlib.Path(reduced_s3_logs_folder_path)
excluded_ips = excluded_ips or collections.defaultdict(bool)

asset_id_handler = _get_default_dandi_asset_id_handler()
Expand Down Expand Up @@ -119,9 +117,9 @@ def reduce_all_dandi_raw_s3_logs(
futures.append(
executor.submit(
_multi_worker_reduce_dandi_raw_s3_log,
maximum_number_of_workers=maximum_number_of_workers,
raw_s3_log_file_path=raw_s3_log_file_path,
temporary_folder_path=temporary_folder_path,
maximum_number_of_workers=maximum_number_of_workers,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_worker,
excluded_ips=excluded_ips,
),
Expand Down Expand Up @@ -149,22 +147,22 @@ def reduce_all_dandi_raw_s3_logs(
leave=True,
mininterval=2.0,
):
per_worker_parsed_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
assert (
len(per_worker_parsed_s3_log_file_paths) != 0
len(per_worker_reduced_s3_log_file_paths) != 0
), f"No files found in {per_worker_temporary_folder_path}!"

for per_worker_parsed_s3_log_file_path in tqdm.tqdm(
iterable=per_worker_parsed_s3_log_file_paths,
for per_worker_reduced_s3_log_file_path in tqdm.tqdm(
iterable=per_worker_reduced_s3_log_file_paths,
desc="Merging results per worker...",
total=len(per_worker_parsed_s3_log_file_paths),
total=len(per_worker_reduced_s3_log_file_paths),
position=1,
leave=False,
mininterval=2.0,
):
merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_parsed_s3_log_file_path.name
merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.name

parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_parsed_s3_log_file_path, header=0)
parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_reduced_s3_log_file_path, header=0)

header = False if merged_temporary_file_path.exists() else True
parsed_s3_log.to_csv(
Expand All @@ -182,11 +180,11 @@ def reduce_all_dandi_raw_s3_logs(
# pragma: no cover
def _multi_worker_reduce_dandi_raw_s3_log(
*,
maximum_number_of_workers: int,
raw_s3_log_file_path: pathlib.Path,
temporary_folder_path: pathlib.Path,
excluded_ips: collections.defaultdict[str, bool] | None,
maximum_number_of_workers: int,
maximum_buffer_size_in_bytes: int,
excluded_ips: collections.defaultdict[str, bool] | None,
) -> None:
"""
A mostly pass-through function to calculate the worker index on the worker and target the correct subfolder.
Expand All @@ -211,7 +209,7 @@ def _multi_worker_reduce_dandi_raw_s3_log(
date = datetime.datetime.now().strftime("%y%m%d")
parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt"
error_message += (
f"Worker index {worker_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n"
f"Worker index {worker_index}/{maximum_number_of_workers} reducing {raw_s3_log_file_path} failed due to\n\n"
)

reduce_dandi_raw_s3_log(
Expand Down Expand Up @@ -243,7 +241,7 @@ def reduce_dandi_raw_s3_log(
tqdm_kwargs: dict | None = None,
) -> None:
"""
Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
Reduce a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
'Parsing' here means:
- limiting only to requests of the specified type (i.e., GET, PUT, etc.)
Expand All @@ -254,7 +252,7 @@ def reduce_dandi_raw_s3_log(
raw_s3_log_file_path : string or pathlib.Path
Path to the raw S3 log file to be reduced.
reduced_s3_logs_folder_path : string or pathlib.Path
The path to write each parsed S3 log file to.
The path to write each reduced S3 log file to.
There will be one file per handled asset ID.
mode : "w" or "a", default: "a"
How to resolve the case when files already exist in the folder containing parsed logs.
Expand Down Expand Up @@ -290,7 +288,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:

reduce_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=reduced_s3_logs_folder_path,
reduced_s3_logs_folder_path=reduced_s3_logs_folder_path,
mode=mode,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
bucket=bucket,
Expand Down
14 changes: 6 additions & 8 deletions src/dandi_s3_log_parser/_s3_log_file_reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import pandas
import tqdm
from pydantic import FilePath, validate_call
from pydantic import DirectoryPath, FilePath, validate_call

from ._buffered_text_reader import BufferedTextReader
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
Expand All @@ -21,7 +21,7 @@
def reduce_raw_s3_log(
*,
raw_s3_log_file_path: FilePath,
parsed_s3_log_folder_path: FilePath,
reduced_s3_logs_folder_path: DirectoryPath,
mode: Literal["w", "a"] = "a",
maximum_buffer_size_in_bytes: int = 4 * 10**9,
bucket: str | None = None,
Expand All @@ -44,8 +44,8 @@ def reduce_raw_s3_log(
----------
raw_s3_log_file_path : str or pathlib.Path
The path to the raw S3 log file.
parsed_s3_log_folder_path : str or pathlib.Path
The path to write each parsed S3 log file to.
reduced_s3_log_folder_path : str or pathlib.Path
The path to write each reduced S3 log file to.
There will be one file per handled asset ID.
mode : "w" or "a", default: "a"
How to resolve the case when files already exist in the folder containing parsed logs.
Expand Down Expand Up @@ -75,9 +75,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
tqdm_kwargs : dict, optional
Keyword arguments to pass to the tqdm progress bar for line buffers.
"""
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
parsed_s3_log_folder_path.mkdir(exist_ok=True)
reduced_s3_logs_folder_path.mkdir(exist_ok=True)
bucket = bucket or ""
excluded_ips = excluded_ips or collections.defaultdict(bool)
asset_id_handler = asset_id_handler or (lambda asset_id: asset_id)
Expand All @@ -96,7 +94,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
)

for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items():
parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{handled_asset_id}.tsv"
parsed_s3_log_file_path = reduced_s3_logs_folder_path / f"{handled_asset_id}.tsv"

data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id)

Expand Down
8 changes: 5 additions & 3 deletions tests/test_reduce_dandi_raw_s3_log_bad_lines.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ def test_reduce_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local) -> None:
example_raw_s3_log_file_path = examples_folder_path / "0.log"
expected_reduced_s3_logs_folder_path = examples_folder_path / "expected_output"

test_reduced_s3_log_folder_path = tmpdir / "reduced_example_2"
test_reduced_s3_logs_folder_path = tmpdir / "reduced_example_2"
test_reduced_s3_logs_folder_path.mkdir(exist_ok=True)

dandi_s3_log_parser.reduce_dandi_raw_s3_log(
raw_s3_log_file_path=example_raw_s3_log_file_path,
reduced_s3_logs_folder_path=test_reduced_s3_log_folder_path,
reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path,
)
test_output_file_paths = list(test_reduced_s3_log_folder_path.iterdir())
test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir())

number_of_output_files = len(test_output_file_paths)
expected_number_of_output_files = 3
Expand Down

0 comments on commit aac39d1

Please sign in to comment.