Skip to content

Commit

Permalink
see if skipping validation is faster
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Aug 15, 2024
1 parent 5f242c4 commit 7830b83
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 30 deletions.
4 changes: 4 additions & 0 deletions src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def parse_all_dandi_raw_s3_logs(
parse_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
validate=False,
mode="a",
excluded_ips=excluded_ips,
asset_id_handler=asset_id_handler,
Expand Down Expand Up @@ -216,6 +217,7 @@ def _multi_worker_parse_dandi_raw_s3_log(
parse_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=per_worker_temporary_folder_path,
validate=False,
mode="a",
excluded_ips=excluded_ips,
asset_id_handler=asset_id_handler,
Expand All @@ -234,6 +236,7 @@ def parse_dandi_raw_s3_log(
*,
raw_s3_log_file_path: str | pathlib.Path,
parsed_s3_log_folder_path: str | pathlib.Path,
validate: bool = True,
mode: Literal["w", "a"] = "a",
excluded_ips: collections.defaultdict[str, bool] | None = None,
asset_id_handler: Callable | None = None,
Expand Down Expand Up @@ -287,6 +290,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
parse_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=parsed_s3_log_folder_path,
validate=validate,
mode=mode,
bucket=bucket,
operation_type=operation_type,
Expand Down
4 changes: 4 additions & 0 deletions src/dandi_s3_log_parser/_s3_log_file_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def parse_raw_s3_log(
*,
raw_s3_log_file_path: str | pathlib.Path,
parsed_s3_log_folder_path: str | pathlib.Path,
validate: bool = True,
mode: Literal["w", "a"] = "a",
bucket: str | None = None,
operation_type: Literal[_KNOWN_OPERATION_TYPES] = "REST.GET.OBJECT",
Expand Down Expand Up @@ -79,6 +80,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:

reduced_and_binned_logs = _get_reduced_and_binned_log_lines(
raw_s3_log_file_path=raw_s3_log_file_path,
validate=validate,
asset_id_handler=asset_id_handler,
bucket=bucket,
operation_type=operation_type,
Expand All @@ -99,6 +101,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
def _get_reduced_and_binned_log_lines(
*,
raw_s3_log_file_path: pathlib.Path,
validate: bool,
asset_id_handler: Callable,
bucket: str,
operation_type: Literal[_KNOWN_OPERATION_TYPES],
Expand Down Expand Up @@ -163,6 +166,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
_append_reduced_log_line(
raw_line=raw_line,
reduced_and_binned_logs=reduced_and_binned_logs,
validate=validate,
asset_id_handler=asset_id_handler,
bucket=bucket,
operation_type=operation_type,
Expand Down
64 changes: 34 additions & 30 deletions src/dandi_s3_log_parser/_s3_log_line_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def _append_reduced_log_line(
*,
raw_line: str,
reduced_and_binned_logs: collections.defaultdict[str, dict[str, list[str | int]]],
validate: bool,
asset_id_handler: Callable,
bucket: str,
operation_type: Literal[_KNOWN_OPERATION_TYPES],
Expand Down Expand Up @@ -86,38 +87,41 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
if full_log_line.bucket != bucket:
return None

# Apply some minimal validation and contribute any invalidations to error collection
# These might slow parsing down a bit, but could be important to ensuring accuracy
errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors"
errors_folder_path.mkdir(exist_ok=True)

dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_line_errors_{task_id}.txt"

if not full_log_line.status_code.isdigit():
message = (
f"Unexpected status code: '{full_log_line.status_code}' on line {line_index} of file {log_file_path}.\n\n"
)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
return None
if validate:
# Apply some minimal validation and contribute any invalidations to error collection
# These might slow parsing down a bit, but could be important to ensuring accuracy
errors_folder_path = DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH / "errors"
errors_folder_path.mkdir(exist_ok=True)

if _IS_OPERATION_TYPE_KNOWN[full_log_line.operation] is False:
message = (
f"Unexpected request type: '{full_log_line.operation}' on line {line_index} of file {log_file_path}.\n\n"
)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
return None
dandi_s3_log_parser_version = importlib.metadata.version(distribution_name="dandi_s3_log_parser")
date = datetime.datetime.now().strftime("%y%m%d")
lines_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_line_errors_{task_id}.txt"

timezone = full_log_line.timestamp[-5:]
is_timezone_utc = timezone != "+0000"
if is_timezone_utc:
message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`.\n\n"
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
# Fine to continue here
if not full_log_line.status_code.isdigit():
message = (
f"Unexpected status code: '{full_log_line.status_code}' "
f"on line {line_index} of file {log_file_path}.\n\n"
)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
return None

if _IS_OPERATION_TYPE_KNOWN[full_log_line.operation] is False:
message = (
f"Unexpected request type: '{full_log_line.operation}' "
f"on line {line_index} of file {log_file_path}.\n\n"
)
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
return None

timezone = full_log_line.timestamp[-5:]
is_timezone_utc = timezone != "+0000"
if is_timezone_utc:
message = f"Unexpected time shift attached to log! Have always seen '+0000', found `{timezone=}`.\n\n"
with open(file=lines_errors_file_path, mode="a") as io:
io.write(message)
# Fine to continue here

# More early skip conditions after validation
# Only accept 200-block status codes
Expand Down

0 comments on commit 7830b83

Please sign in to comment.