Skip to content

Commit

Permalink
Update type hints in upgrade.py
Browse files Browse the repository at this point in the history
  • Loading branch information
cdot65 committed Jan 15, 2024
1 parent ef7d82c commit 40f7d3f
Showing 1 changed file with 49 additions and 48 deletions.
97 changes: 49 additions & 48 deletions upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import time
from logging.handlers import RotatingFileHandler
from typing import Dict, Tuple, Union
from typing import Dict, List, Tuple, Union

# Palo Alto Networks PAN-OS imports
import panos
Expand Down Expand Up @@ -473,7 +473,9 @@ def parse_version(version: str) -> Tuple[int, int, int, int]:

upgrade_needed = current_version < target_version
if upgrade_needed:
logging.info("Upgrade is required.")
logging.info(
f"Confirmed that moving from {firewall.version} to {target_major}.{target_minor}.{target_maintenance} is an upgrade"
)
return

else:
Expand Down Expand Up @@ -721,7 +723,7 @@ def run_assurance(
firewall: Firewall,
hostname: str,
operation_type: str,
action: str,
actions: List[str],
config: Dict[str, Union[str, int, float, bool]],
) -> Union[AssuranceReport, None]:
"""
Expand All @@ -738,8 +740,8 @@ def run_assurance(
An instance of the Firewall class representing the firewall to operate on.
operation_type : str
The type of operation to be executed, e.g., 'readiness_check', 'state_snapshot', 'report'.
action : str
The specific action to be performed within the operation type.
actions : List[str]
A list of specific actions to be performed within the operation type.
config : Dict[str, Union[str, int, float, bool]]
Configuration settings for the specified action.
Expand All @@ -763,48 +765,40 @@ def run_assurance(
assurance_firewall = FirewallProxy(firewall)

results = None
passed = True

if operation_type == "readiness_check":
if action not in AssuranceOptions.READINESS_CHECKS:
logging.error(f"Invalid action for readiness check: {action}")
return
logging.info(f"Performing readiness check: {action}")
for action in actions:
if action not in AssuranceOptions.READINESS_CHECKS:
logging.error(f"Invalid action for readiness check: {action}")
return

checks = CheckFirewall(assurance_firewall)
logging.info(f"Performing readiness check: {action}")

# check if arp entry exists
checks_configuration = [{action: config}]
checks = CheckFirewall(assurance_firewall)
checks_configuration = {action: config}

# run checks
try:
logging.info("Running readiness checks...")
results = checks.run_readiness_checks(checks_configuration)
logging.debug(results)
for check in checks_configuration:
check_name = list(check.keys())[0]
passed = passed & results[check_name]["state"]

if not results[check_name]["state"]:
logging.info(
"FAILED: %s - %s", check_name, results[check_name]["reason"]
)

except Exception as e:
logging.error("Error running readiness checks: %s", e)
return
try:
logging.info("Running readiness checks...")
result = checks.run_readiness_check(checks_configuration)
if result["state"]:
logging.info(f"Passed: {action}")
else:
logging.error(f"FAILED: {action} - {result['reason']}")
results = results or {}
results.update({action: result})
except Exception as e:
logging.error(f"Error running readiness checks: {e}")
return

logging.info("Completed checks successfully!")

elif operation_type == "state_snapshot":
actions = action.split(",")

snapshot_node = CheckFirewall(assurance_firewall)

# validate each type of action
for each in actions:
if each not in AssuranceOptions.STATE_SNAPSHOTS:
logging.error(f"Invalid action for state snapshot: {each}")
for action in actions:
if action not in AssuranceOptions.STATE_SNAPSHOTS:
logging.error(f"Invalid action for state snapshot: {action}")
return

# take snapshots
Expand All @@ -820,15 +814,16 @@ def run_assurance(
return None

except Exception as e:
logging.error("Error running readiness checks: %s", e)
logging.error("Error running snapshots: %s", e)
return

elif operation_type == "report":
if action not in AssuranceOptions.REPORTS:
logging.error(f"Invalid action for report: {action}")
return
logging.info(f"Generating report: {action}")
# result = getattr(Report(firewall), action)(**config)
for action in actions:
if action not in AssuranceOptions.REPORTS:
logging.error(f"Invalid action for report: {action}")
return
logging.info(f"Generating report: {action}")
# result = getattr(Report(firewall), action)(**config)

else:
logging.error(f"Invalid operation type: {operation_type}")
Expand Down Expand Up @@ -939,14 +934,22 @@ def main() -> None:
logging.info(f"{args['target_version']} has been downloaded.")

# Begin collecting network state information with panos-upgrade-assurance
logging.info("Collecting network state information...")
logging.info("Taking a pre-upgrade snapshot of network state information...")
if image_downloaded:
# Use the modified run_assurance function
assurance_report = run_assurance(
firewall,
firewall_details.hostname,
operation_type="state_snapshot",
action="arp_table,content_version,ip_sec_tunnels,license,nics,routes,session_stats",
actions=[
"arp_table",
"content_version",
"ip_sec_tunnels",
"license",
"nics",
"routes",
"session_stats",
],
config={},
)

Expand All @@ -967,11 +970,9 @@ def main() -> None:
else:
logging.error("Failed to create Assurance Report")

logging.info(f"Network state information collected from {firewall.serial}")

logging.info(
f"Network state information collected from {firewall_details.hostname}"
)
logging.info(
f"Pre-Upgrade network state snapshot collected from {firewall_details.hostname}, saved to {file_path}"
)


if __name__ == "__main__":
Expand Down

0 comments on commit 40f7d3f

Please sign in to comment.