Skip to content

Commit

Permalink
feat(logfiles): add function for checking expected messages
Browse files Browse the repository at this point in the history
Refactored the `expect_errors` function by extracting the log message
checking logic into a separate `_check_msgs_presence_in_logs` function.
Added a new `expect_messages` context manager for checking expected
messages in logs. This improves code readability and reusability.
  • Loading branch information
mkoura committed Nov 20, 2024
1 parent 7ed3393 commit 9da8721
Showing 1 changed file with 71 additions and 28 deletions.
99 changes: 71 additions & 28 deletions cardano_node_tests/utils/logfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,33 +270,13 @@ def add_ignore_rule(
infile.write(f"{files_glob};;{skip_after};;{regex}\n")


@contextlib.contextmanager
def expect_errors(regex_pairs: tp.List[tp.Tuple[str, str]], worker_id: str) -> tp.Iterator[None]:
"""Make sure the expected errors are present in logs.
Args:
regex_pairs: [(glob, regex)] - A list of regexes that need to be present in files
described by the glob.
worker_id: The id of the pytest-xdist worker (the `worker_id` fixture) that the test
is running on.
"""
state_dir = cluster_nodes.get_cluster_env().state_dir

glob_list = []
for files_glob, regex in regex_pairs:
add_ignore_rule(files_glob=files_glob, regex=regex, ignore_file_id=worker_id)
glob_list.append(files_glob)
# Resolve the globs
_expanded_paths = [list(state_dir.glob(glob_item)) for glob_item in glob_list]
# Flatten the list
expanded_paths = list(itertools.chain.from_iterable(_expanded_paths))
# Record each end-of-file as a starting offset for searching the log file
seek_offsets = {str(p): helpers.get_eof_offset(p) for p in expanded_paths}

timestamp = time.time()

yield

def _check_msgs_presence_in_logs(
regex_pairs: tp.List[tp.Tuple[str, str]],
seek_offsets: tp.Dict[str, int],
state_dir: pl.Path,
timestamp: float,
) -> None:
"""Make sure the expected messages are present in logs."""
errors = []
for files_glob, regex in regex_pairs:
regex_comp = re.compile(regex)
Expand All @@ -307,7 +287,7 @@ def expect_errors(regex_pairs: tp.List[tp.Tuple[str, str]], worker_id: str) -> t
if ROTATED_RE.match(logfile):
continue

# Search for the expected error
# Search for the expected string
seek = seek_offsets.get(logfile) or 0
line_found = False
for logfile_rec in _get_rotated_logs(
Expand All @@ -329,6 +309,69 @@ def expect_errors(regex_pairs: tp.List[tp.Tuple[str, str]], worker_id: str) -> t
raise AssertionError(errors_joined) from None


@contextlib.contextmanager
def expect_errors(regex_pairs: tp.List[tp.Tuple[str, str]], worker_id: str) -> tp.Iterator[None]:
"""Make sure the expected errors are present in logs.
Context manager.
Args:
regex_pairs: [(glob, regex)] - A list of regexes that need to be present in files
described by the glob.
worker_id: The id of the pytest-xdist worker (the `worker_id` fixture) that the test
is running on.
"""
state_dir = cluster_nodes.get_cluster_env().state_dir

glob_list = []
for files_glob, regex in regex_pairs:
add_ignore_rule(files_glob=files_glob, regex=regex, ignore_file_id=worker_id)
glob_list.append(files_glob)
# Resolve the globs
_expanded_paths = [list(state_dir.glob(glob_item)) for glob_item in glob_list]
# Flatten the list
expanded_paths = list(itertools.chain.from_iterable(_expanded_paths))
# Record each end-of-file as a starting offset for searching the log file
seek_offsets = {str(p): helpers.get_eof_offset(p) for p in expanded_paths}

timestamp = time.time()

yield

_check_msgs_presence_in_logs(
regex_pairs=regex_pairs, seek_offsets=seek_offsets, state_dir=state_dir, timestamp=timestamp
)


@contextlib.contextmanager
def expect_messages(regex_pairs: tp.List[tp.Tuple[str, str]]) -> tp.Iterator[None]:
"""Make sure the expected messages are present in logs.
Context manager.
Args:
regex_pairs: [(glob, regex)] - A list of regexes that need to be present in files
described by the glob.
"""
state_dir = cluster_nodes.get_cluster_env().state_dir

glob_list = [r[0] for r in regex_pairs]
# Resolve the globs
_expanded_paths = [list(state_dir.glob(glob_item)) for glob_item in glob_list]
# Flatten the list
expanded_paths = list(itertools.chain.from_iterable(_expanded_paths))
# Record each end-of-file as a starting offset for searching the log file
seek_offsets = {str(p): helpers.get_eof_offset(p) for p in expanded_paths}

timestamp = time.time()

yield

_check_msgs_presence_in_logs(
regex_pairs=regex_pairs, seek_offsets=seek_offsets, state_dir=state_dir, timestamp=timestamp
)


def search_cluster_logs() -> tp.List[tp.Tuple[pl.Path, str]]:
"""Search cluster logs for errors."""
cluster_env = cluster_nodes.get_cluster_env()
Expand Down

0 comments on commit 9da8721

Please sign in to comment.