Skip to content

Commit

Permalink
draft: refactor snapshot compare and config parser
Browse files Browse the repository at this point in the history
  • Loading branch information
alperenkose committed Sep 6, 2023
1 parent 6d7c0ac commit ceaa31c
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 69 deletions.
4 changes: 2 additions & 2 deletions examples/report/fw2.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
"num-gtpu-pending": "0"
},
"content_version": {
"version": "8647-7730"
"version": "8647-7740"
},
"arp_table": {},
"license": {
Expand Down Expand Up @@ -184,4 +184,4 @@
"ethernet1/2": "up",
"ethernet1/3": "up"
}
}
}
104 changes: 85 additions & 19 deletions examples/report/snapshot_load_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ def load_snap(fname: str) -> dict:


if __name__ == "__main__":
snapshots = {"fw1": load_snap("fw1.snapshot"), "fw2": load_snap("fw2.snapshot")}
# snapshots = {"fw1": load_snap("fw1.snapshot"), "fw2": load_snap("fw2.snapshot")}
# snapshots = {"fw1": load_snap("arp_table_left.snapshot"), "fw2": load_snap("arp_table_right.snapshot")}
# snapshots = {"fw1": load_snap("lic-1.json"), "fw2": load_snap("lic-2.json")}
snapshots = {"fw1": load_snap("lic-5.json"), "fw2": load_snap("lic-6.json")}


reports = [
"all",
{"ip_sec_tunnels": {"properties": ["state"], "count_change_threshold": 5}},
{"arp_table": {"properties": ["!ttl"], "count_change_threshold": 10}},
{"nics": {"count_change_threshold": 10}},
{"license": {"properties": ["!serial"]}},
# "all",
# {"ip_sec_tunnels": {"properties": ["state"], "count_change_threshold": 5}},
# {"arp_table": {"properties": ["!ttl"], "count_change_threshold": 10}},
# {"nics": {"count_change_threshold": 10}},
# {"license": {"properties": ["!serial"]}},

# {"license": {
# "properties": ["!serial", "!issued", "!authcode", "!expires", "!custom", "non-existing"] # exclude higher level
Expand All @@ -37,30 +41,92 @@ def load_snap(fname: str) -> dict:
# "properties": ["serial", "non-existing"] # compare only requested
# }},
# {"license": {
# "properties": ["!issued", "all"] # compare all except
# "properties": ["custom"] # if key exists in some sub-dicts, it will compare all the keys for the other dicts since this will be treated as non-existing key and ignored! NOTE: now it only ignores top level for added/missing
# }},
# {"license": {
# "properties": ["!issued", "serial"] # skip one and compare specific ones
# "properties": ["!issued", "all"] # compare all except
# }},
# {"license": {
# "properties": ["Logging Service"] # even works for parent level
# "properties": ["!issued", "serial"] # skip one and compare specific ones
# }},

# {"license": {
# "properties": ["Logging Service", "!custom"] # combination with parent
# }},
# "!license",
# "license",
# {"license": {
# "properties": ["!Logging Service","!X feature"]
# }},

######## Top level keys - not intented but works
# {"license": {
# "properties": ["Logging Service"] # even works for parent level
# }},
# {"license": {
# "properties": ["!Logging Service"] # even works for parent level
# }},
# {"license": {
# "properties": ["issued", "PAN-DB URL Filtering"] # even works for parent level - but without multi level
# }},
# {"license": {
# "properties": ["X feature"]
# }},
# {"license": {
# "properties": ["!X feature"]
# }},
# {"license": {
# "properties": ["C feature"]
# }},

######## 1st and 2nd level keys
# {"license": {
# "properties": ["!_Log_Storage_TB"] # works..
# }},
# {"license": {
# "properties": ["_Log_Storage_TB"] # works
# }},
# {"license": {
# "properties": ["_Log_Storage_TB","issued"] # works
# }},
# {"license": {
# "properties": ["serial", "!logtype"] # works
# }},
# {"license": {
# "properties": ["custom"] # works
# }},
# {"license": {
# "properties": ["!custom"] # works
# }},
# {"license": {
# "properties": ["alogging"] # works
# }},
# {"license": {
# "properties": ["blogging"] # works
# }},
# {"license": {
# "properties": ["something"] # since no such key is there it passes - works
# }},
# {"license": {
# "properties": ["all"] # works
# }},
# {"license": {
# "properties": ["!logtype"] # works
# }},
{"license": {
"properties": ["!Logging Service", "issued"] # works
}},

{"routes": {"properties": ["!flags"], "count_change_threshold": 10}},
"!content_version",
{
"session_stats": {
"thresholds": [
{"num-max": 10},
{"num-tcp": 10},
]
}
},
# {"routes": {"properties": ["!flags"], "count_change_threshold": 10}},
# "!content_version",
# {
# "session_stats": {
# "thresholds": [
# {"num-max": 10},
# {"num-tcp": 10},
# ]
# }
# },
]

compare = SnapshotCompare(
Expand Down
168 changes: 138 additions & 30 deletions panos_upgrade_assurance/snapshot_compare.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Union, List, Dict
from panos_upgrade_assurance.utils import ConfigParser, SnapType
from panos_upgrade_assurance.utils import ConfigParser, SnapType, get_all_dict_keys
from panos_upgrade_assurance import exceptions

from itertools import chain

class SnapshotCompare:
"""Class comparing snapshots of Firewall Nodes.
Expand Down Expand Up @@ -227,6 +227,49 @@ def calculate_change_percentage(
result["passed"] = False
return result



# @staticmethod
# def get_all_keys(nested_dict): # NOTE keep it in snapshotcompare
# keys = []
# for key, value in nested_dict.items():
# keys.append(key)
# if isinstance(value, dict):
# keys.extend(SnapshotCompare.get_all_keys(value))
# return keys

@staticmethod
def is_key_comparison_allowed(key, properties):
"""
key str
properties List
"""
if properties is None:
return True

if key in properties:
return True
elif ("!"+key) in properties:
return False
elif "all" in properties:
return True
elif all((element.startswith("!") for element in properties)): # all '!' or empty list
return True

return False


@staticmethod
def is_key_excluded_in_properties(key, properties):
if properties is None:
return False

if ("!"+key) in properties:
return True

return False


@staticmethod
def calculate_diff_on_dicts(
left_side_to_compare: Dict[str, Union[str, dict]],
Expand Down Expand Up @@ -392,48 +435,113 @@ def calculate_diff_on_dicts(
changed=dict(passed=True, changed_raw={}),
)

chain_unique_set = lambda set1, set2: set(chain(set1,set2))

missing = left_side_to_compare.keys() - right_side_to_compare.keys()
if missing:
result["missing"]["passed"] = False
for key in missing:
for key in missing:
if SnapshotCompare.is_key_comparison_allowed(key, properties):
result["missing"]["missing_keys"].append(key)
result["missing"]["passed"] = False

# if missing:
# result["missing"]["passed"] = False
# for key in missing:
# result["missing"]["missing_keys"].append(key)

added = right_side_to_compare.keys() - left_side_to_compare.keys()
if added:
result["added"]["passed"] = False
for key in added:
for key in added:
if SnapshotCompare.is_key_comparison_allowed(key, properties):
result["added"]["added_keys"].append(key)
result["added"]["passed"] = False

# if added:
# result["added"]["passed"] = False
# for key in added:
# result["added"]["added_keys"].append(key)

common_keys = left_side_to_compare.keys() & right_side_to_compare.keys()
if common_keys:
keys_to_check = (
ConfigParser(valid_elements=set(common_keys),
requested_config=properties,
ignore_invalid_config=True).prepare_config()
if properties
else common_keys
)
print("common keys:",common_keys)
# keys_to_check = (
# ConfigParser(valid_elements=set(common_keys),
# requested_config=properties,
# ignore_invalid_config=True).prepare_config()
# if properties
# else common_keys
# )
keys_to_check = common_keys # TODO

item_changed = False
for key in keys_to_check:
print(f"Checking {key} in {left_side_to_compare}")
# if key == "PAN-DB URL Filtering":
# import pdb; pdb.set_trace()


if right_side_to_compare[key] != left_side_to_compare[key]:
if isinstance(left_side_to_compare[key], str):
result["changed"]["changed_raw"][key] = dict(
left_snap=left_side_to_compare[key],
right_snap=right_side_to_compare[key],
)
item_changed = True
elif isinstance(left_side_to_compare[key], dict):
nested_results = SnapshotCompare.calculate_diff_on_dicts(
left_side_to_compare=left_side_to_compare[key],
right_side_to_compare=right_side_to_compare[key],
properties=properties,
)

SnapshotCompare.calculate_passed(nested_results)
if not nested_results["passed"]:
result["changed"]["changed_raw"][key] = nested_results

print(f"is key {key} comparison allowed with {properties}")

if SnapshotCompare.is_key_comparison_allowed(key, properties):
result["changed"]["changed_raw"][key] = dict(
left_snap=left_side_to_compare[key],
right_snap=right_side_to_compare[key],
)
item_changed = True

elif isinstance(left_side_to_compare[key], dict):

nested_keys_within_common_key = chain_unique_set(get_all_dict_keys(left_side_to_compare[key]),
get_all_dict_keys(right_side_to_compare[key]))
print("nested keys within common key:",nested_keys_within_common_key)

# if is key excluded in properties
# continue
#
# if key in properties
# ##maybe remove key from properties
# ##if any nested key in properties; call with properties
# ##else call without properties
# recursive call without properties - DO NOT ALLOW MULTI LEVEL FILTERING..
#
# if any nested key in properties
# ##filter properties in nested level
# recursive call
#

if SnapshotCompare.is_key_excluded_in_properties(key, properties): # NOTE should check for exclude explicitly
continue # skip to the next key


if properties and key in properties:
# call without properties - DO NOT ALLOW MULTI LEVEL FILTERING..
nested_results = SnapshotCompare.calculate_diff_on_dicts(
left_side_to_compare=left_side_to_compare[key],
right_side_to_compare=right_side_to_compare[key],
)
else:
nested_results = SnapshotCompare.calculate_diff_on_dicts(
left_side_to_compare=left_side_to_compare[key],
right_side_to_compare=right_side_to_compare[key],
properties=properties,
)

# if any(
# SnapshotCompare.is_key_comparison_allowed(nested_key, properties)
# for nested_key in nested_keys_within_common_key
# ):
# nested_results = SnapshotCompare.calculate_diff_on_dicts(
# left_side_to_compare=left_side_to_compare[key],
# right_side_to_compare=right_side_to_compare[key],
# properties=properties,
# )

if nested_results: # TODO remove this
SnapshotCompare.calculate_passed(nested_results)
if not nested_results["passed"]:
result["changed"]["changed_raw"][key] = nested_results
item_changed = True
else:
raise exceptions.WrongDataTypeException(f"Unknown value format for key {key}.")
result["changed"]["passed"] = not item_changed
Expand Down
Loading

0 comments on commit ceaa31c

Please sign in to comment.