From 7830b835d02f5429857b2f841739c5227b90a77b Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Thu, 15 Aug 2024 14:49:39 -0400 Subject: [PATCH] see if skipping validation is faster --- .../_dandi_s3_log_file_parser.py | 4 ++ .../_s3_log_file_parser.py | 4 ++ .../_s3_log_line_parser.py | 64 ++++++++++--------- 3 files changed, 42 insertions(+), 30 deletions(-) diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py index 27a2b0b..302e09f 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py @@ -85,6 +85,7 @@ def parse_all_dandi_raw_s3_logs( parse_dandi_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, parsed_s3_log_folder_path=parsed_s3_log_folder_path, + validate=False, mode="a", excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, @@ -216,6 +217,7 @@ def _multi_worker_parse_dandi_raw_s3_log( parse_dandi_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, parsed_s3_log_folder_path=per_worker_temporary_folder_path, + validate=False, mode="a", excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, @@ -234,6 +236,7 @@ def parse_dandi_raw_s3_log( *, raw_s3_log_file_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, + validate: bool = True, mode: Literal["w", "a"] = "a", excluded_ips: collections.defaultdict[str, bool] | None = None, asset_id_handler: Callable | None = None, @@ -287,6 +290,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: parse_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, parsed_s3_log_folder_path=parsed_s3_log_folder_path, + validate=validate, mode=mode, bucket=bucket, operation_type=operation_type, diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_parser.py index 19266fc..a3c5b8a 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_parser.py @@ -19,6 +19,7 @@ def parse_raw_s3_log( *, raw_s3_log_file_path: str | pathlib.Path, parsed_s3_log_folder_path: str | pathlib.Path, + validate: bool = True, mode: Literal["w", "a"] = "a", bucket: str | None = None, operation_type: Literal[_KNOWN_OPERATION_TYPES] = "REST.GET.OBJECT", @@ -79,6 +80,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: reduced_and_binned_logs = _get_reduced_and_binned_log_lines( raw_s3_log_file_path=raw_s3_log_file_path, + validate=validate, asset_id_handler=asset_id_handler, bucket=bucket, operation_type=operation_type, @@ -99,6 +101,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: def _get_reduced_and_binned_log_lines( *, raw_s3_log_file_path: pathlib.Path, + validate: bool, asset_id_handler: Callable, bucket: str, operation_type: Literal[_KNOWN_OPERATION_TYPES], @@ -163,6 +166,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: _append_reduced_log_line( raw_line=raw_line, reduced_and_binned_logs=reduced_and_binned_logs, + validate=validate, asset_id_handler=asset_id_handler, bucket=bucket, operation_type=operation_type, diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index 5e57c30..1a0d5db 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -31,6 +31,7 @@ def _append_reduced_log_line( *, raw_line: str, reduced_and_binned_logs: collections.defaultdict[str, dict[str, list[str | int]]], + validate: bool, asset_id_handler: Callable, bucket: str, operation_type: Literal[_KNOWN_OPERATION_TYPES], @@ -86,38 +87,41 @@ def asset_id_handler(*, raw_asset_id: str) -> str: if full_log_line.bucket != bucket: return None - # Apply some minimal validation and contribute any invalidations to error collection - # These might slow parsing down a bit, but could be important to ensuring accuracy - errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" - errors_folder_path.mkdir(exist_ok=True) - - dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") - date = datetime.datetime.now().strftime("%y%m%d") - lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_line_errors_{task_id}.txt" - - if not full_log_line.status_code.isdigit(): - message = ( - f"Unexpected status code: '{full_log_line.status_code}' on line {line_index} of file {log_file_path}.\n\n" - ) - with open(file=lines_errors_file_path, mode="a") as io: - io.write(message) - return None + if validate: + # Apply some minimal validation and contribute any invalidations to error collection + # These might slow parsing down a bit, but could be important to ensuring accuracy + errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors" + errors_folder_path.mkdir(exist_ok=True) - if _IS_OPERATION_TYPE_KNOWN[full_log_line.operation] is False: - message = ( - f"Unexpected request type: '{full_log_line.operation}' on line {line_index} of file {log_file_path}.\n\n" - ) - with open(file=lines_errors_file_path, mode="a") as io: - io.write(message) - return None + dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser") + date = datetime.datetime.now().strftime("%y%m%d") + lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_line_errors_{task_id}.txt" - timezone = full_log_line.timestamp[-5:] - is_timezone_utc = timezone != "+0000" - if is_timezone_utc: - message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`.\n\n" - with open(file=lines_errors_file_path, mode="a") as io: - io.write(message) - # Fine to continue here + if not full_log_line.status_code.isdigit(): + message = ( + f"Unexpected status code: '{full_log_line.status_code}' " + f"on line {line_index} of file {log_file_path}.\n\n" + ) + with open(file=lines_errors_file_path, mode="a") as io: + io.write(message) + return None + + if _IS_OPERATION_TYPE_KNOWN[full_log_line.operation] is False: + message = ( + f"Unexpected request type: '{full_log_line.operation}' " + f"on line {line_index} of file {log_file_path}.\n\n" + ) + with open(file=lines_errors_file_path, mode="a") as io: + io.write(message) + return None + + timezone = full_log_line.timestamp[-5:] + is_timezone_utc = timezone != "+0000" + if is_timezone_utc: + message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`.\n\n" + with open(file=lines_errors_file_path, mode="a") as io: + io.write(message) + # Fine to continue here # More early skip conditions after validation # Only accept 200-block status codes