diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index 1451f3b..28c3ba3 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -6,6 +6,7 @@ from ._order_and_anonymize_parsed_logs import order_and_anonymize_parsed_logs from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs from ._ip_utils import get_hash_salt +from ._log_utils import find_all_known_operation_types __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", @@ -15,4 +16,5 @@ "parse_dandi_raw_s3_log", "parse_all_dandi_raw_s3_logs", "order_and_anonymize_parsed_logs", + "find_all_known_operation_types", ] diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py index 6487cf9..27a2b0b 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py @@ -64,15 +64,16 @@ def parse_all_dandi_raw_s3_logs( """ base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path) parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) - excluded_log_files = excluded_log_files or list() + excluded_log_files = excluded_log_files or {} excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files} excluded_ips = excluded_ips or collections.defaultdict(bool) asset_id_handler = _get_default_dandi_asset_id_handler() + daily_raw_s3_log_file_paths = list(set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files) + # The .rglob is not naturally sorted; shuffle for more uniform progress updates - daily_raw_s3_log_file_paths = set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files - random.shuffle(list(daily_raw_s3_log_file_paths)) + random.shuffle(daily_raw_s3_log_file_paths) if maximum_number_of_workers == 1: for raw_s3_log_file_path in tqdm.tqdm( diff --git a/src/dandi_s3_log_parser/_globals.py b/src/dandi_s3_log_parser/_globals.py new file mode 100644 index 0000000..bdf8f8a --- /dev/null +++ b/src/dandi_s3_log_parser/_globals.py @@ -0,0 +1,53 @@ +import collections +import re + +_KNOWN_OPERATION_TYPES = ( + "REST.GET.OBJECT", + "REST.PUT.OBJECT", + "REST.HEAD.OBJECT", + "REST.POST.OBJECT", + "REST.COPY.PART", + "REST.COPY.OBJECT_GET", + "REST.DELETE.OBJECT", + "REST.OPTIONS.PREFLIGHT", + "BATCH.DELETE.OBJECT", + "WEBSITE.GET.OBJECT", + "REST.GET.BUCKETVERSIONS", + "REST.GET.BUCKET", +) + +_IS_OPERATION_TYPE_KNOWN = collections.defaultdict(bool) +for request_type in _KNOWN_OPERATION_TYPES: + _IS_OPERATION_TYPE_KNOWN[request_type] = True + +_FULL_PATTERN_TO_FIELD_MAPPING = [ + "bucket_owner", + "bucket", + "timestamp", + "ip_address", + "requester", + "request_id", + "operation", + "asset_id", + "request_uri", + # "http_version", # Regex not splitting this from the request_uri... + "status_code", + "error_code", + "bytes_sent", + "object_size", + "total_time", + "turn_around_time", + "referrer", + "user_agent", + "version", + "host_id", + "sigv", + "cipher_suite", + "auth_type", + "endpoint", + "tls_version", + "access_point_arn", +] +_FullLogLine = collections.namedtuple("FullLogLine", _FULL_PATTERN_TO_FIELD_MAPPING) + +_S3_LOG_REGEX = re.compile(pattern=r'"([^"]+)"|\[([^]]+)]|([^ ]+)') diff --git a/src/dandi_s3_log_parser/_log_utils.py b/src/dandi_s3_log_parser/_log_utils.py new file mode 100644 index 0000000..ad43774 --- /dev/null +++ b/src/dandi_s3_log_parser/_log_utils.py @@ -0,0 +1,35 @@ +import pathlib + +import tqdm +from pydantic import DirectoryPath, FilePath, validate_call + +from ._buffered_text_reader import BufferedTextReader + + +@validate_call +def find_all_known_operation_types( + base_raw_s3_log_folder_path: DirectoryPath, excluded_log_files: list[FilePath] | None +) -> set: + excluded_log_files = excluded_log_files or {} + excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files} + + daily_raw_s3_log_file_paths = list(set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files) + + unique_operation_types = set() + for raw_s3_log_file_path in tqdm.tqdm( + iterable=daily_raw_s3_log_file_paths, + desc="Extracting operation types from log files...", + position=0, + leave=True, + ): + # The start of each line should be regular enough to reliably slice out just the span of the operation type + # (plus some extra bits on the end from irregularly of operation type length) + operation_types_per_file = { + raw_log_line[136:160].split(" ")[0] + for buffered_text_reader in BufferedTextReader(file_path=raw_s3_log_file_path) + for raw_log_line in buffered_text_reader + } + + unique_operation_types.update(operation_types_per_file) + + return unique_operation_types diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index 4134b20..7886deb 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -15,61 +15,17 @@ import datetime import importlib.metadata import pathlib -import re from collections.abc import Callable from typing import Literal from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH - -_KNOWN_OPERATION_TYPES = ( - "REST.GET.OBJECT", - "REST.PUT.OBJECT", - "REST.HEAD.OBJECT", - "REST.POST.OBJECT", - "REST.COPY.PART", - "REST.COPY.OBJECT_GET", - "REST.DELETE.OBJECT", - "REST.OPTIONS.PREFLIGHT", - "BATCH.DELETE.OBJECT", - "WEBSITE.GET.OBJECT", +from ._globals import ( + _IS_OPERATION_TYPE_KNOWN, + _KNOWN_OPERATION_TYPES, + _S3_LOG_REGEX, + _FullLogLine, ) -_IS_OPERATION_TYPE_KNOWN = collections.defaultdict(bool) -for request_type in _KNOWN_OPERATION_TYPES: - _IS_OPERATION_TYPE_KNOWN[request_type] = True - -_FULL_PATTERN_TO_FIELD_MAPPING = [ - "bucket_owner", - "bucket", - "timestamp", - "ip_address", - "requester", - "request_id", - "operation", - "asset_id", - "request_uri", - # "http_version", # Regex not splitting this from the request_uri... - "status_code", - "error_code", - "bytes_sent", - "object_size", - "total_time", - "turn_around_time", - "referrer", - "user_agent", - "version", - "host_id", - "sigv", - "cipher_suite", - "auth_type", - "endpoint", - "tls_version", - "access_point_arn", -] -_FullLogLine = collections.namedtuple("FullLogLine", _FULL_PATTERN_TO_FIELD_MAPPING) - -_S3_LOG_REGEX = re.compile(pattern=r'"([^"]+)"|\[([^]]+)]|([^ ]+)') - def _append_reduced_log_line( *, @@ -146,11 +102,9 @@ def asset_id_handler(*, raw_asset_id: str) -> str: with open(file=lines_errors_file_path, mode="a") as io: io.write(message) - handled_operation_type = full_log_line.operation - if _IS_OPERATION_TYPE_KNOWN[handled_operation_type] is False: + if _IS_OPERATION_TYPE_KNOWN[full_log_line.operation] is False: message = ( - f"Unexpected request type: '{handled_operation_type}' handled from '{full_log_line.operation}' " - f"on line {line_index} of file {log_file_path}.\n\n" + f"Unexpected request type: '{full_log_line.operation}' on line {line_index} of file {log_file_path}.\n\n" ) with open(file=lines_errors_file_path, mode="a") as io: io.write(message) @@ -166,7 +120,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: if full_log_line.status_code[0] != "2": return - if handled_operation_type != operation_type: + if full_log_line.operation != operation_type: return if excluded_ips[full_log_line.ip_address] is True: