From ceaa31c86ecc91942c3d0400f2cf6e59378e0351 Mon Sep 17 00:00:00 2001 From: Alp Kose Date: Wed, 6 Sep 2023 16:15:34 +0300 Subject: [PATCH] draft: refactor snapshot compare and config parser --- examples/report/fw2.snapshot | 4 +- examples/report/snapshot_load_compare.py | 104 +++++++++--- panos_upgrade_assurance/snapshot_compare.py | 168 ++++++++++++++++---- panos_upgrade_assurance/utils.py | 119 +++++++++++--- 4 files changed, 326 insertions(+), 69 deletions(-) diff --git a/examples/report/fw2.snapshot b/examples/report/fw2.snapshot index 99397d5..0feec6e 100644 --- a/examples/report/fw2.snapshot +++ b/examples/report/fw2.snapshot @@ -105,7 +105,7 @@ "num-gtpu-pending": "0" }, "content_version": { - "version": "8647-7730" + "version": "8647-7740" }, "arp_table": {}, "license": { @@ -184,4 +184,4 @@ "ethernet1/2": "up", "ethernet1/3": "up" } -} \ No newline at end of file +} diff --git a/examples/report/snapshot_load_compare.py b/examples/report/snapshot_load_compare.py index d34b4b1..806631c 100755 --- a/examples/report/snapshot_load_compare.py +++ b/examples/report/snapshot_load_compare.py @@ -12,14 +12,18 @@ def load_snap(fname: str) -> dict: if __name__ == "__main__": - snapshots = {"fw1": load_snap("fw1.snapshot"), "fw2": load_snap("fw2.snapshot")} + # snapshots = {"fw1": load_snap("fw1.snapshot"), "fw2": load_snap("fw2.snapshot")} + # snapshots = {"fw1": load_snap("arp_table_left.snapshot"), "fw2": load_snap("arp_table_right.snapshot")} + # snapshots = {"fw1": load_snap("lic-1.json"), "fw2": load_snap("lic-2.json")} + snapshots = {"fw1": load_snap("lic-5.json"), "fw2": load_snap("lic-6.json")} + reports = [ - "all", - {"ip_sec_tunnels": {"properties": ["state"], "count_change_threshold": 5}}, - {"arp_table": {"properties": ["!ttl"], "count_change_threshold": 10}}, - {"nics": {"count_change_threshold": 10}}, - {"license": {"properties": ["!serial"]}}, + # "all", + # {"ip_sec_tunnels": {"properties": ["state"], "count_change_threshold": 5}}, + # {"arp_table": {"properties": ["!ttl"], "count_change_threshold": 10}}, + # {"nics": {"count_change_threshold": 10}}, + # {"license": {"properties": ["!serial"]}}, # {"license": { # "properties": ["!serial", "!issued", "!authcode", "!expires", "!custom", "non-existing"] # exclude higher level @@ -37,30 +41,92 @@ def load_snap(fname: str) -> dict: # "properties": ["serial", "non-existing"] # compare only requested # }}, # {"license": { - # "properties": ["!issued", "all"] # compare all except + # "properties": ["custom"] # if key exists in some sub-dicts, it will compare all the keys for the other dicts since this will be treated as non-existing key and ignored! NOTE: now it only ignores top level for added/missing # }}, # {"license": { - # "properties": ["!issued", "serial"] # skip one and compare specific ones + # "properties": ["!issued", "all"] # compare all except # }}, # {"license": { - # "properties": ["Logging Service"] # even works for parent level + # "properties": ["!issued", "serial"] # skip one and compare specific ones # }}, + # {"license": { # "properties": ["Logging Service", "!custom"] # combination with parent # }}, # "!license", # "license", + # {"license": { + # "properties": ["!Logging Service","!X feature"] + # }}, + + ######## Top level keys - not intented but works + # {"license": { + # "properties": ["Logging Service"] # even works for parent level + # }}, + # {"license": { + # "properties": ["!Logging Service"] # even works for parent level + # }}, + # {"license": { + # "properties": ["issued", "PAN-DB URL Filtering"] # even works for parent level - but without multi level + # }}, + # {"license": { + # "properties": ["X feature"] + # }}, + # {"license": { + # "properties": ["!X feature"] + # }}, + # {"license": { + # "properties": ["C feature"] + # }}, + + ######## 1st and 2nd level keys + # {"license": { + # "properties": ["!_Log_Storage_TB"] # works.. + # }}, + # {"license": { + # "properties": ["_Log_Storage_TB"] # works + # }}, + # {"license": { + # "properties": ["_Log_Storage_TB","issued"] # works + # }}, + # {"license": { + # "properties": ["serial", "!logtype"] # works + # }}, + # {"license": { + # "properties": ["custom"] # works + # }}, + # {"license": { + # "properties": ["!custom"] # works + # }}, + # {"license": { + # "properties": ["alogging"] # works + # }}, + # {"license": { + # "properties": ["blogging"] # works + # }}, + # {"license": { + # "properties": ["something"] # since no such key is there it passes - works + # }}, + # {"license": { + # "properties": ["all"] # works + # }}, + # {"license": { + # "properties": ["!logtype"] # works + # }}, + {"license": { + "properties": ["!Logging Service", "issued"] # works + }}, - {"routes": {"properties": ["!flags"], "count_change_threshold": 10}}, - "!content_version", - { - "session_stats": { - "thresholds": [ - {"num-max": 10}, - {"num-tcp": 10}, - ] - } - }, + # {"routes": {"properties": ["!flags"], "count_change_threshold": 10}}, + # "!content_version", + # { + # "session_stats": { + # "thresholds": [ + # {"num-max": 10}, + # {"num-tcp": 10}, + # ] + # } + # }, ] compare = SnapshotCompare( diff --git a/panos_upgrade_assurance/snapshot_compare.py b/panos_upgrade_assurance/snapshot_compare.py index d09efc7..8fcf89c 100644 --- a/panos_upgrade_assurance/snapshot_compare.py +++ b/panos_upgrade_assurance/snapshot_compare.py @@ -1,7 +1,7 @@ from typing import Optional, Union, List, Dict -from panos_upgrade_assurance.utils import ConfigParser, SnapType +from panos_upgrade_assurance.utils import ConfigParser, SnapType, get_all_dict_keys from panos_upgrade_assurance import exceptions - +from itertools import chain class SnapshotCompare: """Class comparing snapshots of Firewall Nodes. @@ -227,6 +227,49 @@ def calculate_change_percentage( result["passed"] = False return result + + + # @staticmethod + # def get_all_keys(nested_dict): # NOTE keep it in snapshotcompare + # keys = [] + # for key, value in nested_dict.items(): + # keys.append(key) + # if isinstance(value, dict): + # keys.extend(SnapshotCompare.get_all_keys(value)) + # return keys + + @staticmethod + def is_key_comparison_allowed(key, properties): + """ + key str + properties List + """ + if properties is None: + return True + + if key in properties: + return True + elif ("!"+key) in properties: + return False + elif "all" in properties: + return True + elif all((element.startswith("!") for element in properties)): # all '!' or empty list + return True + + return False + + + @staticmethod + def is_key_excluded_in_properties(key, properties): + if properties is None: + return False + + if ("!"+key) in properties: + return True + + return False + + @staticmethod def calculate_diff_on_dicts( left_side_to_compare: Dict[str, Union[str, dict]], @@ -392,48 +435,113 @@ def calculate_diff_on_dicts( changed=dict(passed=True, changed_raw={}), ) + chain_unique_set = lambda set1, set2: set(chain(set1,set2)) + missing = left_side_to_compare.keys() - right_side_to_compare.keys() - if missing: - result["missing"]["passed"] = False - for key in missing: + for key in missing: + if SnapshotCompare.is_key_comparison_allowed(key, properties): result["missing"]["missing_keys"].append(key) + result["missing"]["passed"] = False + + # if missing: + # result["missing"]["passed"] = False + # for key in missing: + # result["missing"]["missing_keys"].append(key) added = right_side_to_compare.keys() - left_side_to_compare.keys() - if added: - result["added"]["passed"] = False - for key in added: + for key in added: + if SnapshotCompare.is_key_comparison_allowed(key, properties): result["added"]["added_keys"].append(key) + result["added"]["passed"] = False + + # if added: + # result["added"]["passed"] = False + # for key in added: + # result["added"]["added_keys"].append(key) common_keys = left_side_to_compare.keys() & right_side_to_compare.keys() if common_keys: - keys_to_check = ( - ConfigParser(valid_elements=set(common_keys), - requested_config=properties, - ignore_invalid_config=True).prepare_config() - if properties - else common_keys - ) + print("common keys:",common_keys) + # keys_to_check = ( + # ConfigParser(valid_elements=set(common_keys), + # requested_config=properties, + # ignore_invalid_config=True).prepare_config() + # if properties + # else common_keys + # ) + keys_to_check = common_keys # TODO item_changed = False for key in keys_to_check: + print(f"Checking {key} in {left_side_to_compare}") + # if key == "PAN-DB URL Filtering": + # import pdb; pdb.set_trace() + + if right_side_to_compare[key] != left_side_to_compare[key]: if isinstance(left_side_to_compare[key], str): - result["changed"]["changed_raw"][key] = dict( - left_snap=left_side_to_compare[key], - right_snap=right_side_to_compare[key], - ) - item_changed = True - elif isinstance(left_side_to_compare[key], dict): - nested_results = SnapshotCompare.calculate_diff_on_dicts( - left_side_to_compare=left_side_to_compare[key], - right_side_to_compare=right_side_to_compare[key], - properties=properties, - ) - - SnapshotCompare.calculate_passed(nested_results) - if not nested_results["passed"]: - result["changed"]["changed_raw"][key] = nested_results + + print(f"is key {key} comparison allowed with {properties}") + + if SnapshotCompare.is_key_comparison_allowed(key, properties): + result["changed"]["changed_raw"][key] = dict( + left_snap=left_side_to_compare[key], + right_snap=right_side_to_compare[key], + ) item_changed = True + + elif isinstance(left_side_to_compare[key], dict): + + nested_keys_within_common_key = chain_unique_set(get_all_dict_keys(left_side_to_compare[key]), + get_all_dict_keys(right_side_to_compare[key])) + print("nested keys within common key:",nested_keys_within_common_key) + + # if is key excluded in properties + # continue + # + # if key in properties + # ##maybe remove key from properties + # ##if any nested key in properties; call with properties + # ##else call without properties + # recursive call without properties - DO NOT ALLOW MULTI LEVEL FILTERING.. + # + # if any nested key in properties + # ##filter properties in nested level + # recursive call + # + + if SnapshotCompare.is_key_excluded_in_properties(key, properties): # NOTE should check for exclude explicitly + continue # skip to the next key + + + if properties and key in properties: + # call without properties - DO NOT ALLOW MULTI LEVEL FILTERING.. + nested_results = SnapshotCompare.calculate_diff_on_dicts( + left_side_to_compare=left_side_to_compare[key], + right_side_to_compare=right_side_to_compare[key], + ) + else: + nested_results = SnapshotCompare.calculate_diff_on_dicts( + left_side_to_compare=left_side_to_compare[key], + right_side_to_compare=right_side_to_compare[key], + properties=properties, + ) + + # if any( + # SnapshotCompare.is_key_comparison_allowed(nested_key, properties) + # for nested_key in nested_keys_within_common_key + # ): + # nested_results = SnapshotCompare.calculate_diff_on_dicts( + # left_side_to_compare=left_side_to_compare[key], + # right_side_to_compare=right_side_to_compare[key], + # properties=properties, + # ) + + if nested_results: # TODO remove this + SnapshotCompare.calculate_passed(nested_results) + if not nested_results["passed"]: + result["changed"]["changed_raw"][key] = nested_results + item_changed = True else: raise exceptions.WrongDataTypeException(f"Unknown value format for key {key}.") result["changed"]["passed"] = not item_changed diff --git a/panos_upgrade_assurance/utils.py b/panos_upgrade_assurance/utils.py index 9922fef..0160301 100644 --- a/panos_upgrade_assurance/utils.py +++ b/panos_upgrade_assurance/utils.py @@ -192,13 +192,16 @@ def __init__( if requested_config: # if not None or not empty list self.requested_config = deepcopy(requested_config) - self._requested_config_names = set( - [ConfigParser._extract_element_name(config_keyword) for config_keyword in self.requested_config] - ) + self._requested_config_names = set(self._iter_extracted_config_names(self.requested_config)) + self._requested_all_exclusive = self.is_all_exclusive(self._requested_config_names) + + # self._requested_config_names = set( + # [ConfigParser._extract_element_name(config_keyword) for config_keyword in self.requested_config] + # ) non_existing_keys = set() for config_name in self._requested_config_names: - if not self._is_element_included(element=config_name): + if not self._is_element_valid(element=config_name): non_existing_keys.add(config_name) if not ignore_invalid_config: # if invalid config is not accepted @@ -212,7 +215,20 @@ def __init__( self._requested_config_names = set(valid_elements) self.requested_config = list(valid_elements) # Meaning 'all' valid tests - def _is_element_included(self, element: str) -> bool: + def _get_pure_element_name(self, element): + return element[1:] if element.startswith("!") else element + + @staticmethod + def is_all_exclusive(config): + """Method to check if all config elements are exclusive. + """ + if all((config_name.startswith("!") for config_name in config)): # TODO what to do for empty config + return True + + return False + + + def _is_element_valid(self, element: str) -> bool: """Method verifying if a config element is a correct (supported) value. This method can also handle `not-elements` (see [`dialect`](/panos/docs/panos-upgrade-assurance/dialect)). @@ -226,15 +242,51 @@ def _is_element_included(self, element: str) -> bool: bool: `True` if the value is correct, `False` otherwise. """ - if element in self.valid_elements or (element.startswith("!") and element[1:] in self.valid_elements): + if self._get_pure_element_name(element) in self.valid_elements: return True elif element == "all" and "all" in self.requested_config: return True else: return False + + @staticmethod + def is_element_included(element, config): # TODO use for is_key_comparison_allowed + """Method verifying if a given element should be included according to the config list. + """ + if not config: # if config list is None or empty list it should be included + return True + + if f"!{element}" in config: + return False + elif ( element in config or "all" in config or ConfigParser.is_all_exclusive(config) ): + return True + + return False + + @staticmethod - def _extract_element_name(config: Union[str, dict]) -> str: + def is_element_explicit_excluded(element, config): + """Method verifying if a given element should excluded. + + This method is required for snapshot comparison when we need to check if we should further compare + nested dicts - it doesnot work to check with is_element_included for that case since nested dict key + might not be included but nested keys might be subject to comparison + """ + if config and f"!{element}" in config: # if config isnot none/empty and !element is in config + return True + + return False + + def _iter_extracted_config_names(self, config): + """ + """ + for config_name in config: + yield ConfigParser._extract_element_name(config_name) + + + @staticmethod + def _extract_element_name(config: Union[str, dict]) -> str: # TODO may change param name to config_element """Method that extracts the name from a config element. If a config element is a string, the actual config element is returned. For elements of a dictionary type, the @@ -272,7 +324,6 @@ def _remove_invalid_requested_config(self, invalid_keys: set) -> None: invalid_keys (set): A set of keys to remove from `self.requested_config`. """ - for config_element in self.requested_config[:]: if ConfigParser._extract_element_name(config_element) in invalid_keys: self.requested_config.remove(config_element) @@ -285,7 +336,7 @@ def _expand_all(self) -> None: This method directly operates on `self.requested_config`. """ - pure_names = set([(name[1:] if name.startswith("!") else name) for name in self._requested_config_names if name != "all"]) + pure_names = set([self._get_pure_element_name(name) for name in self._requested_config_names if name != "all"]) self.requested_config.extend(list(self.valid_elements - pure_names)) self.requested_config.remove("all") @@ -301,19 +352,40 @@ def prepare_config(self) -> List[Union[str, dict]]: list: The parsed configuration. """ - if all((config_name.startswith("!") for config_name in self._requested_config_names)): - self.requested_config.insert(0, "all") - if "all" in self.requested_config: - self._expand_all() + # NOTE alternative approach to use is element included logic + # loop over valid elements and call is_element_included with self._requested_config_names as config param + # if element should be included; + # get element from original requested_config and put it to final config + # (need a seperate method for this to fetch config element with name if element is a dict) + # if element couldnot be found in requested_config put pure name in final config + # finally remove the 'all' keyword from requested_config if exists + + # with this approach we change the prepare config logic completely and dont use expand all + # but is element included has `is_all_exclusive` call inside and it can make performance worse + # to call it for each element. + + # for valid_element in self.valid_elements: + # if self.is_element_included(valid_element, self._requested_config_names): + # # get requested_config element with element name ?? + + ##### existing approach + # if self.is_all_exclusive(self._requested_config_names): + # self.requested_config.insert(0, "all") - final_configs = [] + # # if all((config_name.startswith("!") for config_name in self._requested_config_names)): + # # self.requested_config.insert(0, "all") - for config_element in self.requested_config: - if not ConfigParser._extract_element_name(config_element).startswith("!"): - final_configs.append(config_element) + # if "all" in self.requested_config: + # self._expand_all() - return final_configs + # final_configs = [] + + # for config_element in self.requested_config: + # if not ConfigParser._extract_element_name(config_element).startswith("!"): + # final_configs.append(config_element) + + # return final_configs def interpret_yes_no(boolstr: str) -> bool: @@ -337,6 +409,17 @@ def interpret_yes_no(boolstr: str) -> bool: return True if boolstr == "yes" else False +def get_all_dict_keys(nested_dict: dict) -> List: + """Get all keys for a nested dictionary in a recursive way. + + # TODO docstring + """ + keys = [] + for key, value in nested_dict.items(): + keys.append(key) + if isinstance(value, dict): + keys.extend(get_all_dict_keys(value)) + return keys def printer(report: dict, indent_level: int = 0) -> None: # pragma: no cover - exclude from pytest coverage """Print reports in human friendly format.