diff --git a/per-message-s3-exporter/firehose_reader.py b/per-message-s3-exporter/firehose_reader.py index 990381b..9477cea 100644 --- a/per-message-s3-exporter/firehose_reader.py +++ b/per-message-s3-exporter/firehose_reader.py @@ -7,6 +7,7 @@ import logging import os from pathlib import Path +import re import ssl import time import warnings @@ -15,6 +16,7 @@ import attr +EVENTS_RE: re.Pattern = re.compile(r"events\s+\"(?P[a-z_ ]+)\"") FIREHOSE_MESSAGE_TYPES: List[str] = [ "arrival", "cancellation", @@ -62,7 +64,17 @@ def pitr_map_location() -> Path: return Path(os.getenv("PITR_MAP", "/home/firestarter/pitrs/.pitrs-map")) -def pitr_map_from_file(init_time: str) -> Dict[str, int]: +def pitr_map_message_types(init_args: str) -> List[str]: + """Return a list of all requested message types or all valid message types + if the initiation command does not specify a subset""" + events_match = EVENTS_RE.match(init_time) + if events_match is None: + return FIREHOSE_MESSAGE_TYPES + + return events_match["events"] + + +def pitr_map_from_file(init_time: str, init_args: str) -> Dict[str, int]: """Load PITR map from file, filling in a PITR value based on the init_time when appropriate @@ -78,7 +90,8 @@ def pitr_map_from_file(init_time: str) -> Dict[str, int]: return {} start_pitr_from_env = int(init_time.split()[-1]) - pitr_map = {message_type: start_pitr_from_env for message_type in FIREHOSE_MESSAGE_TYPES} + message_types = pitr_map_message_types(init_args) + pitr_map = {message_type: start_pitr_from_env for message_type in message_types} logging.info(f"Fetching start PITRs from {pitr_map_path}") with pitr_map_path.open(encoding="utf-8") as pitr_map_file: @@ -117,13 +130,29 @@ def from_env(cls): init_time = os.environ.get("INIT_CMD_TIME", "live") init_time_split = init_time.split() + init_args = os.environ.get("INIT_CMD_ARGS", "") + for command in [ + "live", + "pitr", + "range", + "compression", + "keepalive", + "username", + "password", + ]: + if command in init_args.split(): + raise ValueError( + f'$INIT_CMD_ARGS should not contain the "{command}" command. ' + "It belongs in its own variable." + ) + if init_time_split[0] not in ("live", "pitr", "range"): raise ValueError( '$INIT_CMD_TIME value is invalid, should be "live", ' '"pitr " or "range "' ) - pitr_map = pitr_map_from_file(init_time) + pitr_map = pitr_map_from_file(init_time, init_args) if pitr_map: min_pitr = min(pitr_map.values()) logging.info(f"Based on PITR map {pitr_map}") @@ -135,22 +164,6 @@ def from_env(cls): init_time_split[1] = f"{min_pitr}" init_time = " ".join(init_time_split) - init_args = os.environ.get("INIT_CMD_ARGS", "") - for command in [ - "live", - "pitr", - "range", - "compression", - "keepalive", - "username", - "password", - ]: - if command in init_args.split(): - raise ValueError( - f'$INIT_CMD_ARGS should not contain the "{command}" command. ' - "It belongs in its own variable." - ) - return cls( username=username, password=apikey,