Skip to content

Commit

Permalink
put together simple util to scan for operation values
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 14, 2024
1 parent a6ba38d commit e063f18
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 57 deletions.
2 changes: 2 additions & 0 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ._order_and_anonymize_parsed_logs import order_and_anonymize_parsed_logs
from ._dandi_s3_log_file_parser import parse_dandi_raw_s3_log, parse_all_dandi_raw_s3_logs
from ._ip_utils import get_hash_salt
from ._log_utils import find_all_known_operation_types

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
Expand All @@ -15,4 +16,5 @@
"parse_dandi_raw_s3_log",
"parse_all_dandi_raw_s3_logs",
"order_and_anonymize_parsed_logs",
"find_all_known_operation_types",
]
7 changes: 4 additions & 3 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ def parse_all_dandi_raw_s3_logs(
"""
base_raw_s3_log_folder_path = pathlib.Path(base_raw_s3_log_folder_path)
parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path)
excluded_log_files = excluded_log_files or list()
excluded_log_files = excluded_log_files or {}
excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files}
excluded_ips = excluded_ips or collections.defaultdict(bool)

asset_id_handler = _get_default_dandi_asset_id_handler()

daily_raw_s3_log_file_paths = list(set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files)

# The .rglob is not naturally sorted; shuffle for more uniform progress updates
daily_raw_s3_log_file_paths = set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files
random.shuffle(list(daily_raw_s3_log_file_paths))
random.shuffle(daily_raw_s3_log_file_paths)

if maximum_number_of_workers == 1:
for raw_s3_log_file_path in tqdm.tqdm(
Expand Down
53 changes: 53 additions & 0 deletions src/dandi_s3_log_parser/_globals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import collections
import re

_KNOWN_OPERATION_TYPES = (
"REST.GET.OBJECT",
"REST.PUT.OBJECT",
"REST.HEAD.OBJECT",
"REST.POST.OBJECT",
"REST.COPY.PART",
"REST.COPY.OBJECT_GET",
"REST.DELETE.OBJECT",
"REST.OPTIONS.PREFLIGHT",
"BATCH.DELETE.OBJECT",
"WEBSITE.GET.OBJECT",
"REST.GET.BUCKETVERSIONS",
"REST.GET.BUCKET",
)

_IS_OPERATION_TYPE_KNOWN = collections.defaultdict(bool)
for request_type in _KNOWN_OPERATION_TYPES:
_IS_OPERATION_TYPE_KNOWN[request_type] = True

_FULL_PATTERN_TO_FIELD_MAPPING = [
"bucket_owner",
"bucket",
"timestamp",
"ip_address",
"requester",
"request_id",
"operation",
"asset_id",
"request_uri",
# "http_version", # Regex not splitting this from the request_uri...
"status_code",
"error_code",
"bytes_sent",
"object_size",
"total_time",
"turn_around_time",
"referrer",
"user_agent",
"version",
"host_id",
"sigv",
"cipher_suite",
"auth_type",
"endpoint",
"tls_version",
"access_point_arn",
]
_FullLogLine = collections.namedtuple("FullLogLine", _FULL_PATTERN_TO_FIELD_MAPPING)

_S3_LOG_REGEX = re.compile(pattern=r'"([^"]+)"|\[([^]]+)]|([^ ]+)')
35 changes: 35 additions & 0 deletions src/dandi_s3_log_parser/_log_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pathlib

import tqdm
from pydantic import DirectoryPath, FilePath, validate_call

from ._buffered_text_reader import BufferedTextReader


@validate_call
def find_all_known_operation_types(
base_raw_s3_log_folder_path: DirectoryPath, excluded_log_files: list[FilePath] | None
) -> set:
excluded_log_files = excluded_log_files or {}
excluded_log_files = {pathlib.Path(excluded_log_file) for excluded_log_file in excluded_log_files}

daily_raw_s3_log_file_paths = list(set(base_raw_s3_log_folder_path.rglob(pattern="*.log")) - excluded_log_files)

unique_operation_types = set()
for raw_s3_log_file_path in tqdm.tqdm(
iterable=daily_raw_s3_log_file_paths,
desc="Extracting operation types from log files...",
position=0,
leave=True,
):
# The start of each line should be regular enough to reliably slice out just the span of the operation type
# (plus some extra bits on the end from irregularly of operation type length)
operation_types_per_file = {
raw_log_line[136:160].split(" ")[0]
for buffered_text_reader in BufferedTextReader(file_path=raw_s3_log_file_path)
for raw_log_line in buffered_text_reader
}

unique_operation_types.update(operation_types_per_file)

return unique_operation_types
62 changes: 8 additions & 54 deletions src/dandi_s3_log_parser/_s3_log_line_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,17 @@
import datetime
import importlib.metadata
import pathlib
import re
from collections.abc import Callable
from typing import Literal

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH

_KNOWN_OPERATION_TYPES = (
"REST.GET.OBJECT",
"REST.PUT.OBJECT",
"REST.HEAD.OBJECT",
"REST.POST.OBJECT",
"REST.COPY.PART",
"REST.COPY.OBJECT_GET",
"REST.DELETE.OBJECT",
"REST.OPTIONS.PREFLIGHT",
"BATCH.DELETE.OBJECT",
"WEBSITE.GET.OBJECT",
from ._globals import (
_IS_OPERATION_TYPE_KNOWN,
_KNOWN_OPERATION_TYPES,
_S3_LOG_REGEX,
_FullLogLine,
)

_IS_OPERATION_TYPE_KNOWN = collections.defaultdict(bool)
for request_type in _KNOWN_OPERATION_TYPES:
_IS_OPERATION_TYPE_KNOWN[request_type] = True

_FULL_PATTERN_TO_FIELD_MAPPING = [
"bucket_owner",
"bucket",
"timestamp",
"ip_address",
"requester",
"request_id",
"operation",
"asset_id",
"request_uri",
# "http_version", # Regex not splitting this from the request_uri...
"status_code",
"error_code",
"bytes_sent",
"object_size",
"total_time",
"turn_around_time",
"referrer",
"user_agent",
"version",
"host_id",
"sigv",
"cipher_suite",
"auth_type",
"endpoint",
"tls_version",
"access_point_arn",
]
_FullLogLine = collections.namedtuple("FullLogLine", _FULL_PATTERN_TO_FIELD_MAPPING)

_S3_LOG_REGEX = re.compile(pattern=r'"([^"]+)"|\[([^]]+)]|([^ ]+)')


def _append_reduced_log_line(
*,
Expand Down Expand Up @@ -146,11 +102,9 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)

handled_operation_type = full_log_line.operation
if _IS_OPERATION_TYPE_KNOWN[handled_operation_type] is False:
if _IS_OPERATION_TYPE_KNOWN[full_log_line.operation] is False:
message = (
f"Unexpected request type: '{handled_operation_type}' handled from '{full_log_line.operation}' "
f"on line {line_index} of file {log_file_path}.\n\n"
f"Unexpected request type: '{full_log_line.operation}' on line {line_index} of file {log_file_path}.\n\n"
)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
Expand All @@ -166,7 +120,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
if full_log_line.status_code[0] != "2":
return

if handled_operation_type != operation_type:
if full_log_line.operation != operation_type:
return

if excluded_ips[full_log_line.ip_address] is True:
Expand Down

0 comments on commit e063f18

Please sign in to comment.