diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py index 902a4fa..6487cf9 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py @@ -281,14 +281,14 @@ def asset_id_handler(*, raw_asset_id: str) -> str: tqdm_kwargs = tqdm_kwargs or dict() bucket = "dandiarchive" - request_type = "GET" + operation_type = "REST.GET.OBJECT" parse_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, parsed_s3_log_folder_path=parsed_s3_log_folder_path, mode=mode, bucket=bucket, - request_type=request_type, + operation_type=operation_type, excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, tqdm_kwargs=tqdm_kwargs, diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 6c3e658..19266fc 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -8,18 +8,20 @@ import pandas import tqdm +from pydantic import validate_call from ._buffered_text_reader import BufferedTextReader -from ._s3_log_line_parser import _append_reduced_log_line +from ._s3_log_line_parser import _KNOWN_OPERATION_TYPES, _append_reduced_log_line +@validate_call def parse_raw_s3_log( *, raw_s3_log_file_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, mode: Literal["w", "a"] = "a", bucket: str | None = None, - request_type: Literal["GET", "PUT"] = "GET", + operation_type: Literal[_KNOWN_OPERATION_TYPES] = "REST.GET.OBJECT", excluded_ips: collections.defaultdict[str, bool] | None = None, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, @@ -47,8 +49,8 @@ def parse_raw_s3_log( over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. bucket : str Only parse and return lines that match this bucket. - request_type : str, default: "GET" - The type of request to filter for. + operation_type : str, default: "REST.GET" + The type of operation to filter for. excluded_ips : collections.defaultdict of strings to booleans, optional A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. asset_id_handler : callable, optional @@ -79,7 +81,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: raw_s3_log_file_path=raw_s3_log_file_path, asset_id_handler=asset_id_handler, bucket=bucket, - request_type=request_type, + operation_type=operation_type, excluded_ips=excluded_ips, tqdm_kwargs=tqdm_kwargs, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, @@ -99,7 +101,7 @@ def _get_reduced_and_binned_log_lines( raw_s3_log_file_path: pathlib.Path, asset_id_handler: Callable, bucket: str, - request_type: Literal["GET", "PUT"], + operation_type: Literal[_KNOWN_OPERATION_TYPES], excluded_ips: collections.defaultdict[str, bool], tqdm_kwargs: dict, maximum_buffer_size_in_bytes: int, @@ -121,8 +123,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: return split_by_slash[0] + "_" + split_by_slash[-1] bucket : str Only parse and return lines that match this bucket. - request_type : str - The type of request to filter for. + operation_type : str + The type of operation to filter for. excluded_ips : collections.defaultdict of strings to booleans A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. tqdm_kwargs : dict, optional @@ -163,7 +165,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: reduced_and_binned_logs=reduced_and_binned_logs, asset_id_handler=asset_id_handler, bucket=bucket, - request_type=request_type, + operation_type=operation_type, excluded_ips=excluded_ips, log_file_path=raw_s3_log_file_path, line_index=line_index, diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index 5590ff3..4134b20 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -17,24 +17,26 @@ import pathlib import re from collections.abc import Callable +from typing import Literal from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH -# Known forms: -# REST.GET.OBJECT -# REST.PUT.OBJECT -# REST.HEAD.OBJECT -# REST.POST.OBJECT -# REST.COPY.PART and REST.COPY.OBJECT_GET -# REST.DELETE.OBJECT -# REST.OPTIONS.PREFLIGHT -# BATCH.DELETE.OBJECT -# Longer names are truncated for lower data overhead via direct slicing based on known lengths and separator locations -_KNOWN_REQUEST_TYPES = ["GET", "PUT", "HEAD", "POST", "COPY", "DELE", "OPTI", ".DEL"] - -_IS_REQUEST_TYPE_KNOWN = collections.defaultdict(bool) -for request_type in _KNOWN_REQUEST_TYPES: - _IS_REQUEST_TYPE_KNOWN[request_type] = True +_KNOWN_OPERATION_TYPES = ( + "REST.GET.OBJECT", + "REST.PUT.OBJECT", + "REST.HEAD.OBJECT", + "REST.POST.OBJECT", + "REST.COPY.PART", + "REST.COPY.OBJECT_GET", + "REST.DELETE.OBJECT", + "REST.OPTIONS.PREFLIGHT", + "BATCH.DELETE.OBJECT", + "WEBSITE.GET.OBJECT", +) + +_IS_OPERATION_TYPE_KNOWN = collections.defaultdict(bool) +for request_type in _KNOWN_OPERATION_TYPES: + _IS_OPERATION_TYPE_KNOWN[request_type] = True _FULL_PATTERN_TO_FIELD_MAPPING = [ "bucket_owner", @@ -75,7 +77,7 @@ def _append_reduced_log_line( reduced_and_binned_logs: collections.defaultdict[str, dict[str, list[str | int]]], asset_id_handler: Callable, bucket: str, - request_type: str, + operation_type: Literal[_KNOWN_OPERATION_TYPES], excluded_ips: collections.defaultdict[str, bool], line_index: int, log_file_path: pathlib.Path, @@ -100,8 +102,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: return split_by_slash[0] + "_" + split_by_slash[-1] bucket : string Only parse and return lines that match this bucket string. - request_type : string - The type of request to filter for. + operation_type : string + The type of operation to filter for. excluded_ips : collections.defaultdict of strings to booleans A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. line_index: int @@ -144,11 +146,10 @@ def asset_id_handler(*, raw_asset_id: str) -> str: with open(file=lines_errors_file_path, mode="a") as io: io.write(message) - operation_slice = slice(5, 8) if full_log_line.operation[8] == "." else slice(5, 9) - handled_request_type = full_log_line.operation[operation_slice] - if _IS_REQUEST_TYPE_KNOWN[handled_request_type] is False: + handled_operation_type = full_log_line.operation + if _IS_OPERATION_TYPE_KNOWN[handled_operation_type] is False: message = ( - f"Unexpected request type: '{handled_request_type}' handled from '{full_log_line.operation}' " + f"Unexpected request type: '{handled_operation_type}' handled from '{full_log_line.operation}' " f"on line {line_index} of file {log_file_path}.\n\n" ) with open(file=lines_errors_file_path, mode="a") as io: @@ -165,7 +166,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: if full_log_line.status_code[0] != "2": return - if handled_request_type != request_type: + if handled_operation_type != operation_type: return if excluded_ips[full_log_line.ip_address] is True: